drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [03/29] drill git commit: DRILL-5275: Sort spill is slow due to repeated allocations
Date Sat, 25 Feb 2017 07:17:56 GMT
DRILL-5275: Sort spill is slow due to repeated allocations

Rather than create a heap buffer per vector when writing and reading,
the revised code creates a single, shared buffer used for all I/O
within a particular container. This improves performance by reducing GC
and CPU costs during I/Os.

Move I/O buffer, and methods to allocator

Allows the buffer to be shared. Especially in the sort, this is
important, as the sort may have many serializations open at once.

closes #754


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6bc398fc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6bc398fc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6bc398fc

Branch: refs/heads/master
Commit: 6bc398fc55bcbf73f1f882de67f146927d1d09d0
Parents: 38f816a
Author: Paul Rogers <progers@maprtech.com>
Authored: Sun Feb 19 17:53:31 2017 -0800
Committer: Sudheesh Katkam <sudheesh@apache.org>
Committed: Fri Feb 24 18:41:48 2017 -0800

----------------------------------------------------------------------
 .../cache/VectorAccessibleSerializable.java     |  9 ++---
 .../apache/drill/exec/memory/BaseAllocator.java | 37 ++++++++++++++++++++
 .../drill/exec/memory/BufferAllocator.java      | 29 +++++++++++++++
 3 files changed, 71 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6bc398fc/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index e3bf5bd..89876af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -90,6 +90,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable
{
    * @param input the InputStream to read from
    * @throws IOException
    */
+  @SuppressWarnings("resource")
   @Override
   public void readFromStream(InputStream input) throws IOException {
     final VectorContainer container = new VectorContainer();
@@ -112,7 +113,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable
{
       final DrillBuf buf = allocator.buffer(dataLength);
       final ValueVector vector;
       try {
-        buf.writeBytes(input, dataLength);
+        allocator.read(buf, input, dataLength);
         vector = TypeHelper.getNewVector(field, allocator);
         vector.load(metaData, buf);
       } finally {
@@ -136,6 +137,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable
{
    * @param output the OutputStream to write to
    * @throws IOException
    */
+  @SuppressWarnings("resource")
   @Override
   public void writeToStream(OutputStream output) throws IOException {
     Preconditions.checkNotNull(output);
@@ -159,7 +161,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable
{
 
       /* If we have a selection vector, dump it to file first */
       if (svBuf != null) {
-        svBuf.getBytes(0, output, svBuf.readableBytes());
+        allocator.write(svBuf, output);
         sv2.setBuffer(svBuf);
         svBuf.release(); // sv2 now owns the buffer
         sv2.setRecordCount(svCount);
@@ -168,8 +170,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable
{
       /* Dump the array of ByteBuf's associated with the value vectors */
       for (DrillBuf buf : incomingBuffers) {
                 /* dump the buffer into the OutputStream */
-        int bufLength = buf.readableBytes();
-        buf.getBytes(0, output, bufLength);
+        allocator.write(buf, output);
       }
 
       output.flush();

http://git-wip-us.apache.org/repos/asf/drill/blob/6bc398fc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 1245e86..ba47998 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -21,6 +21,9 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.DrillBuf;
 import io.netty.buffer.UnsafeDirectLittleEndian;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.IdentityHashMap;
 import java.util.Set;
@@ -789,4 +792,38 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   public static boolean isDebug() {
     return DEBUG;
   }
+
+  /**
+   * Disk I/O buffer used for all reads and writes of DrillBufs.
+   */
+
+  private byte ioBuffer[];
+
+  public byte[] getIOBuffer() {
+    if (ioBuffer == null) {
+      ioBuffer = new byte[32*1024];
+    }
+    return ioBuffer;
+  }
+
+  public void read(DrillBuf buf, InputStream in, int length) throws IOException {
+    buf.clear();
+
+    byte[] buffer = getIOBuffer();
+    for (int posn = 0; posn < length; posn += buffer.length) {
+      int len = Math.min(buffer.length, length - posn);
+      in.read(buffer, 0, len);
+      buf.writeBytes(buffer, 0, len);
+    }
+  }
+
+  public void write(DrillBuf buf, OutputStream out) throws IOException {
+    byte[] buffer = getIOBuffer();
+    int bufLength = buf.readableBytes();
+    for (int posn = 0; posn < bufLength; posn += buffer.length) {
+      int len = Math.min(buffer.length, bufLength - posn);
+      buf.getBytes(posn, buffer, 0, len);
+      out.write(buffer, 0, len);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6bc398fc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 64f7d86..3c5f57d 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -20,6 +20,10 @@ package org.apache.drill.exec.memory;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.BufferManager;
 
@@ -151,4 +155,29 @@ public interface BufferAllocator extends AutoCloseable {
    * a no-op.
    */
   public void assertOpen();
+
+  /**
+   * Write the contents of a DrillBuf to a stream. Use this method, rather
+   * than calling the DrillBuf.getBytes() method, because this method
+   * avoids repeated heap allocation for the intermediate heap buffer.
+   *
+   * @param buf the Drillbuf to write
+   * @param output the output stream
+   * @throws IOException if a write error occurs
+   */
+
+  public void write(DrillBuf buf, OutputStream out) throws IOException;
+
+  /**
+   * Read the contents of a DrillBuf from a stream. Use this method, rather
+   * than calling the DrillBuf.writeBytes() method, because this method
+   * avoids repeated heap allocation for the intermediate heap buffer.
+   *
+   * @param buf the buffer to read with space already allocated
+   * @param input input stream from which to read data
+   * @param bufLength number of bytes to read
+   * @throws IOException if a read error occurs
+   */
+
+  public void read(DrillBuf buf, InputStream in, int length) throws IOException;
 }


Mime
View raw message