drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From meh...@apache.org
Subject drill git commit: DRILL-3093: Close raw batch buffers in data collectors
Date Fri, 15 May 2015 02:21:32 GMT
Repository: drill
Updated Branches:
  refs/heads/master 16ef62851 -> 7f575df33


DRILL-3093: Close raw batch buffers in data collectors


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7f575df3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7f575df3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7f575df3

Branch: refs/heads/master
Commit: 7f575df33b6cf553c69de011ff46efa69ba0cca4
Parents: 16ef628
Author: Mehant Baid <mehantr@gmail.com>
Authored: Thu May 14 17:55:48 2015 -0700
Committer: Mehant Baid <mehantr@gmail.com>
Committed: Thu May 14 19:20:19 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/AutoCloseables.java   | 18 ++++++++++++++++++
 .../impl/mergereceiver/MergingRecordBatch.java    |  6 ------
 .../unorderedreceiver/UnorderedReceiverBatch.java |  1 -
 .../exec/record/RawFragmentBatchProvider.java     |  3 +--
 .../exec/work/batch/AbstractDataCollector.java    |  4 +++-
 .../drill/exec/work/batch/BaseRawBatchBuffer.java |  2 +-
 .../drill/exec/work/batch/DataCollector.java      |  2 +-
 .../drill/exec/work/batch/IncomingBuffers.java    |  7 +++----
 .../exec/work/batch/SpoolingRawBatchBuffer.java   |  5 +++--
 9 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index fa1eb92..c080c52 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -42,4 +42,22 @@ public class AutoCloseables {
       logger.warn("Failure on close(): " + e);
     }
   }
+
+  public static void close(AutoCloseable[] ac) throws Exception {
+    Exception topLevelException = null;
+    for (AutoCloseable closeable : ac) {
+      try {
+        closeable.close();
+      } catch (Exception e) {
+        if (topLevelException == null) {
+          topLevelException = e;
+        } else {
+          topLevelException.addSuppressed(e);
+        }
+      }
+    }
+    if (topLevelException != null) {
+      throw topLevelException;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 6da132b..baf9bda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -738,12 +738,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         }
       }
     }
-    if (fragProviders != null) {
-      for (final RawFragmentBatchProvider f : fragProviders) {
-        f.cleanup();
-      }
-    }
-
     super.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 1498441..684f715 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -214,7 +214,6 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   @Override
   public void close() {
     batchLoader.clear();
-    fragProvider.cleanup();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index 030785c..14db502 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -21,9 +21,8 @@ import java.io.IOException;
 
 import org.apache.drill.exec.ops.FragmentContext;
 
-public interface RawFragmentBatchProvider {
+public interface RawFragmentBatchProvider extends AutoCloseable{
 
   public RawFragmentBatch getNext() throws IOException, InterruptedException;
   public void kill(FragmentContext context);
-  public void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 407c547..d52cb5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
@@ -129,7 +130,8 @@ public abstract class AbstractDataCollector implements DataCollector{
   protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
 
   @Override
-  public void close() {
+  public void close() throws Exception {
+    AutoCloseables.close(buffers);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 5192e46..11b6cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -110,7 +110,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
 
 //  ## Add assertion that all acks have been sent. TODO
   @Override
-  public void cleanup() {
+  public void close() {
     if (!isTerminated() && context.shouldContinue()) {
       final String msg = String.format("Cleanup before finished. %d out of %d strams have
finished", completedStreams(), fragmentCount);
       final IllegalStateException e = new IllegalStateException(msg);

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
index dc016be..de88d02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
@@ -27,5 +27,5 @@ interface DataCollector extends AutoCloseable {
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
   public int getTotalIncomingFragments();
-  public void close();
+  public void close() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 1c8b066..b21c61d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
@@ -110,10 +111,8 @@ public class IncomingBuffers implements AutoCloseable {
   }
 
   @Override
-  public void close() {
-    for (DataCollector fragment : fragCounts.values()) {
-      fragment.close();
-    }
+  public void close() throws Exception {
+    AutoCloseables.close(fragCounts.values().toArray(new AutoCloseable[0]));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f575df3/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 1634982..cfe5b6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -240,7 +240,8 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
     logger.debug("Got batch. Current buffer size: {}", bufferQueue.size());
   }
 
-  public void cleanup() {
+  @Override
+  public void close() {
     if (spooler != null) {
       spooler.terminate();
       while (spooler.isAlive()) {
@@ -270,7 +271,7 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
         logger.warn("Failed to delete temporary files", e);
       }
     }
-    super.cleanup();
+    super.close();
   }
 
   private class Spooler extends Thread {


Mime
View raw message