drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [06/13] drill git commit: DRILL-4134: Allocator Improvements
Date Tue, 22 Dec 2015 15:06:29 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
deleted file mode 100644
index 36bcacf..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.DrillBuf;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
-
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.testing.ControlsInjector;
-import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.util.AssertionUtil;
-import org.apache.drill.exec.util.Pointer;
-
-public class TopLevelAllocator implements BufferAllocator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
-  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(TopLevelAllocator.class);
-  public static final String CHILD_BUFFER_INJECTION_SITE = "child.buffer";
-
-  public static long MAXIMUM_DIRECT_MEMORY;
-
-  private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
-  private final Map<ChildAllocator, StackTraceElement[]> childrenMap;
-  private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT;
-  private final Accountor acct;
-  private final boolean errorOnLeak;
-  private final DrillBuf empty;
-
-  private final AtomicInteger idGenerator = new AtomicInteger(0);
-
-  private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){
-    MAXIMUM_DIRECT_MEMORY = maximumAllocation;
-    this.errorOnLeak = errorOnLeak;
-    this.acct = new Accountor(config, errorOnLeak, null, null, maximumAllocation, 0, true);
-    this.empty = DrillBuf.getEmpty(this, acct);
-    this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, StackTraceElement[]>()
: null;
-  }
-
-  TopLevelAllocator(DrillConfig config) {
-    this(config, Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)),
-        config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK)
-        );
-  }
-
-  @Override
-  public int getId() {
-    return idGenerator.incrementAndGet();
-  }
-
-  @Override
-  public boolean takeOwnership(DrillBuf buf) {
-    return buf.transferAccounting(acct);
-  }
-
-  @Override
-  public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
-    DrillBuf b = new DrillBuf(this, acct, buf);
-    out.value = b;
-    return acct.transferIn(b, b.capacity());
-  }
-
-  @Override
-  public DrillBuf buffer(int min, int max) {
-    if (min == 0) {
-      return empty;
-    }
-    if(!acct.reserve(min)) {
-      throw new OutOfMemoryRuntimeException(createErrorMsg(this, min));
-    }
-
-    try {
-      UnsafeDirectLittleEndian buffer = innerAllocator.directBuffer(min, max);
-      DrillBuf wrapped = new DrillBuf(this, acct, buffer);
-      acct.reserved(min, wrapped);
-      return wrapped;
-    } catch (OutOfMemoryError e) {
-      if ("Direct buffer memory".equals(e.getMessage())) {
-        acct.release(min);
-        throw new OutOfMemoryRuntimeException(createErrorMsg(this, min), e);
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  @Override
-  public DrillBuf buffer(int size) {
-    return buffer(size, size);
-  }
-
-  @Override
-  public long getAllocatedMemory() {
-    return acct.getAllocation();
-  }
-
-  @Override
-  public long getPeakMemoryAllocation() {
-    return acct.getPeakMemoryAllocation();
-  }
-
-  @Override
-  public ByteBufAllocator getUnderlyingAllocator() {
-    return innerAllocator;
-  }
-
-  @Override
-  public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
-      long initialReservation, long maximumReservation, int flags) {
-    return getChildAllocator(allocatorOwner.getFragmentContext(), initialReservation,
-        maximumReservation, (flags & BufferAllocator.F_LIMITING_ROOT) != 0);
-  }
-
-    @Override
-  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
-      long maximumReservation, boolean applyFragmentLimit) {
-    if(!acct.reserve(initialReservation)){
-      logger.debug(String.format("You attempted to create a new child allocator with initial
reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity()
- acct.getAllocation()));
-      throw new OutOfMemoryRuntimeException(
-          String
-              .format(
-                  "You attempted to create a new child allocator with initial reservation
%d but only %d bytes of memory were available.",
-                  initialReservation, acct.getCapacity() - acct.getAllocation()));
-    }
-    logger.debug("New child allocator with initial reservation {}", initialReservation);
-    ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation,
childrenMap, applyFragmentLimit);
-    if(ENABLE_ACCOUNTING){
-      childrenMap.put(allocator, Thread.currentThread().getStackTrace());
-    }
-
-    return allocator;
-  }
-
-  @Override
-  public void setFragmentLimit(long limit){
-    acct.setFragmentLimit(limit);
-  }
-
-  @Override
-  public long getFragmentLimit(){
-    return acct.getFragmentLimit();
-  }
-
-  @Override
-  public void close() {
-    if (ENABLE_ACCOUNTING) {
-      for (Entry<ChildAllocator, StackTraceElement[]> child : childrenMap.entrySet())
{
-        if (!child.getKey().isClosed()) {
-          StringBuilder sb = new StringBuilder();
-          StackTraceElement[] elements = child.getValue();
-          for (int i = 0; i < elements.length; i++) {
-            sb.append("\t\t");
-            sb.append(elements[i]);
-            sb.append("\n");
-          }
-          throw new IllegalStateException("Failure while trying to close allocator: Child
level allocators not closed. Stack trace: \n" + sb);
-        }
-      }
-    }
-    acct.close();
-  }
-
-
-
-  @Override
-  public DrillBuf getEmpty() {
-    return empty;
-  }
-
-
-
-  private class ChildAllocator implements BufferAllocator {
-    private final DrillBuf empty;
-    private Accountor childAcct;
-    private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>();
-    private boolean closed = false;
-    private FragmentContext fragmentContext;
-    private Map<ChildAllocator, StackTraceElement[]> thisMap;
-
-    public ChildAllocator(FragmentContext context,
-                          Accountor parentAccountor,
-                          long max,
-                          long pre,
-                          Map<ChildAllocator,
-                          StackTraceElement[]> map,
-        boolean applyFragmentLimit) {
-      assert max >= pre;
-      DrillConfig drillConf = context != null ? context.getConfig() : null;
-      childAcct = new Accountor(drillConf, errorOnLeak, context, parentAccountor, max, pre,
applyFragmentLimit);
-      this.fragmentContext=context;
-      thisMap = map;
-      this.empty = DrillBuf.getEmpty(this, childAcct);
-    }
-
-    @Override
-    public int getId() {
-      return idGenerator.incrementAndGet();
-    }
-
-    @Override
-    public boolean takeOwnership(DrillBuf buf) {
-      return buf.transferAccounting(childAcct);
-    }
-
-    @Override
-    public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
-      DrillBuf b = new DrillBuf(this, acct, buf);
-      out.value = b;
-      return acct.transferIn(b, b.capacity());
-    }
-
-
-    @Override
-    public DrillBuf buffer(int size, int max) {
-      if (ENABLE_ACCOUNTING) {
-        injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE);
-      }
-
-      if (size == 0) {
-        return empty;
-      }
-      if(!childAcct.reserve(size)) {
-        throw new OutOfMemoryRuntimeException(createErrorMsg(this, size));
-      }
-
-      try {
-        UnsafeDirectLittleEndian buffer = innerAllocator.directBuffer(size, max);
-        DrillBuf wrapped = new DrillBuf(this, childAcct, buffer);
-        childAcct.reserved(buffer.capacity(), wrapped);
-        return wrapped;
-      } catch (OutOfMemoryError e) {
-        if ("Direct buffer memory".equals(e.getMessage())) {
-          childAcct.release(size);
-          throw new OutOfMemoryRuntimeException(createErrorMsg(this, size), e);
-        } else {
-          throw e;
-        }
-      }
-    }
-
-    @Override
-    public DrillBuf buffer(int size) {
-      return buffer(size, size);
-    }
-
-    @Override
-    public ByteBufAllocator getUnderlyingAllocator() {
-      return innerAllocator;
-    }
-
-    @Override
-    public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
-        long initialReservation, long maximumReservation, int flags) {
-      return getChildAllocator(allocatorOwner.getFragmentContext(), initialReservation,
-          maximumReservation, (flags & BufferAllocator.F_LIMITING_ROOT) != 0);
-    }
-
-    @Override
-    public BufferAllocator getChildAllocator(FragmentContext context,
-        long initialReservation, long maximumReservation, boolean applyFragmentLimit) {
-      if (!childAcct.reserve(initialReservation)) {
-        throw new OutOfMemoryRuntimeException(
-            String
-                .format(
-                    "You attempted to create a new child allocator with initial reservation
%d but only %d bytes of memory were available.",
-                    initialReservation, childAcct.getAvailable()));
-      }
-      logger.debug("New child allocator with initial reservation {}", initialReservation);
-      ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation,
initialReservation, null, applyFragmentLimit);
-      this.children.put(newChildAllocator, Thread.currentThread().getStackTrace());
-      return newChildAllocator;
-    }
-
-    @Override
-    public AllocationReservation newReservation() {
-      return new PreAlloc(this, this.childAcct);
-    }
-
-    @Override
-    public void setFragmentLimit(long limit){
-      childAcct.setFragmentLimit(limit);
-    }
-
-    @Override
-    public long getFragmentLimit(){
-      return childAcct.getFragmentLimit();
-    }
-
-    @Override
-    public void close() {
-      if (ENABLE_ACCOUNTING) {
-        if (thisMap != null) {
-          thisMap.remove(this);
-        }
-        for (ChildAllocator child : children.keySet()) {
-          if (!child.isClosed()) {
-            StringBuilder sb = new StringBuilder();
-            StackTraceElement[] elements = children.get(child);
-            for (int i = 1; i < elements.length; i++) {
-              sb.append("\t\t");
-              sb.append(elements[i]);
-              sb.append("\n");
-            }
-
-
-            final FragmentHandle handle = fragmentContext.getHandle();
-            IllegalStateException e = new IllegalStateException(String.format(
-                    "Failure while trying to close child allocator: Child level allocators
not closed. Fragment %d:%d. Stack trace: \n %s",
-                    handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString()));
-            if (errorOnLeak) {
-              throw e;
-            } else {
-              logger.warn("Memory leak.", e);
-            }
-          }
-        }
-      }
-      childAcct.close();
-      closed = true;
-    }
-
-    public boolean isClosed() {
-      return closed;
-    }
-
-    @Override
-    public long getAllocatedMemory() {
-      return childAcct.getAllocation();
-    }
-
-    @Override
-    public long getPeakMemoryAllocation() {
-      return childAcct.getPeakMemoryAllocation();
-    }
-
-    @Override
-    public DrillBuf getEmpty() {
-      return empty;
-    }
-  }
-
-  @Override
-  public AllocationReservation newReservation() {
-    return new PreAlloc(this, this.acct);
-  }
-
-  public class PreAlloc extends AllocationReservation {
-    int bytes = 0;
-    final Accountor acct;
-    final BufferAllocator allocator;
-    private PreAlloc(BufferAllocator allocator, Accountor acct) {
-      this.acct = acct;
-      this.allocator = allocator;
-    }
-
-    @Override
-    protected boolean reserve(int bytes) {
-      if (!acct.reserve(bytes)) {
-        return false;
-      }
-
-      this.bytes += bytes;
-      return true;
-    }
-
-    @Override
-    protected DrillBuf allocate(int bytes) {
-      assert this.bytes == bytes : "allocation size mismatch";
-      DrillBuf b = new DrillBuf(allocator, acct, innerAllocator.directBuffer(bytes, bytes));
-      acct.reserved(bytes, b);
-      return b;
-    }
-
-    @Override
-    protected void releaseReservation(int nBytes) {
-      acct.release(nBytes);
-    }
-  }
-
-  private static String createErrorMsg(final BufferAllocator allocator, final int size) {
-    return String.format("Unable to allocate buffer of size %d due to memory limit. Current
allocation: %d",
-      size, allocator.getAllocatedMemory());
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/resources/drill-module.conf b/exec/memory/impl/src/main/resources/drill-module.conf
deleted file mode 100644
index 593ef8e..0000000
--- a/exec/memory/impl/src/main/resources/drill-module.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-//  This file tells Drill to consider this module when class path scanning.
-//  This file can also include any supplementary configuration information.
-//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md
for more information.
-drill: {
-  memory: {
-    debug.error_on_leak: true,
-    top.max: 1000000000000
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
b/exec/memory/impl/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
deleted file mode 100644
index 2028a23..0000000
--- a/exec/memory/impl/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import static org.junit.Assert.assertEquals;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.common.DrillAutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.test.DrillTest;
-import org.junit.Test;
-
-
-public class TestEndianess extends DrillTest {
-  @Test
-  public void testLittleEndian() {
-    final DrillConfig drillConfig = DrillConfig.create();
-    final BufferAllocator a = RootAllocatorFactory.newRoot(drillConfig);
-    final ByteBuf b = a.buffer(4);
-    b.setInt(0, 35);
-    assertEquals(b.getByte(0), 35);
-    assertEquals(b.getByte(1), 0);
-    assertEquals(b.getByte(2), 0);
-    assertEquals(b.getByte(3), 0);
-    b.release();
-    DrillAutoCloseables.closeNoChecked(a);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/pom.xml b/exec/memory/pom.xml
index 2738f34..2a98f1f 100644
--- a/exec/memory/pom.xml
+++ b/exec/memory/pom.xml
@@ -30,6 +30,5 @@
 
   <modules>
     <module>base</module>
-    <module>impl</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 1cacf9c..bc79677 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -36,7 +36,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
 
   public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup
eventLoopGroup, T handshakeType,
       Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser,
String connectionName) {
-    super(rpcMapping, alloc.getUnderlyingAllocator(), eventLoopGroup, handshakeType, responseClass,
handshakeParser);
+    super(rpcMapping, alloc.getAsByteBufAllocator(), eventLoopGroup, handshakeType, responseClass,
handshakeParser);
     this.alloc = alloc;
     this.connectionName = connectionName;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java
b/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java
index 8a330b8..bd06ba5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java
@@ -29,8 +29,6 @@ import com.google.common.primitives.UnsignedLongs;
 public class ByteFunctionHelpers {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteFunctionHelpers.class);
 
-  private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
-
   /**
    * Helper function to check for equality of bytes in two DrillBuffers
    *
@@ -43,7 +41,7 @@ public class ByteFunctionHelpers {
    * @return 1 if left input is greater, -1 if left input is smaller, 0 otherwise
    */
   public static final int equal(final DrillBuf left, int lStart, int lEnd, final DrillBuf
right, int rStart, int rEnd){
-    if(BOUNDS_CHECKING_ENABLED){
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       left.checkBytes(lStart, lEnd);
       right.checkBytes(rStart, rEnd);
     }
@@ -97,7 +95,7 @@ public class ByteFunctionHelpers {
    * @return 1 if left input is greater, -1 if left input is smaller, 0 otherwise
    */
   public static final int compare(final DrillBuf left, int lStart, int lEnd, final DrillBuf
right, int rStart, int rEnd){
-    if(BOUNDS_CHECKING_ENABLED){
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       left.checkBytes(lStart, lEnd);
       right.checkBytes(rStart, rEnd);
     }
@@ -152,7 +150,7 @@ public class ByteFunctionHelpers {
    * @return 1 if left input is greater, -1 if left input is smaller, 0 otherwise
    */
   public static final int compare(final DrillBuf left, int lStart, int lEnd, final byte[]
right, int rStart, final int rEnd) {
-    if(BOUNDS_CHECKING_ENABLED){
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       left.checkBytes(lStart, lEnd);
     }
     return memcmp(left.memoryAddress(), lStart, lEnd, right, rStart, rEnd);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/vector/src/main/java/org/apache/drill/exec/util/DecimalUtility.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/util/DecimalUtility.java b/exec/vector/src/main/java/org/apache/drill/exec/util/DecimalUtility.java
index 0922b22..e8130ec 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/util/DecimalUtility.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/util/DecimalUtility.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.util;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -142,7 +144,7 @@ public class DecimalUtility extends CoreDecimalUtility{
         return str;
     }
 
-    public static BigDecimal getBigDecimalFromIntermediate(DrillBuf data, int startIndex,
int nDecimalDigits, int scale) {
+  public static BigDecimal getBigDecimalFromIntermediate(ByteBuf data, int startIndex, int
nDecimalDigits, int scale) {
 
         // In the intermediate representation we don't pad the scale with zeroes, so set
truncate = false
         return getBigDecimalFromDrillBuf(data, startIndex, nDecimalDigits, scale, false);
@@ -172,7 +174,8 @@ public class DecimalUtility extends CoreDecimalUtility{
      * This function assumes that data is provided in a non-dense format
      * It works on both sparse and intermediate representations.
      */
-    public static BigDecimal getBigDecimalFromDrillBuf(DrillBuf data, int startIndex, int
nDecimalDigits, int scale, boolean truncateScale) {
+  public static BigDecimal getBigDecimalFromDrillBuf(ByteBuf data, int startIndex, int nDecimalDigits,
int scale,
+      boolean truncateScale) {
 
         // For sparse decimal type we have padded zeroes at the end, strip them while converting
to BigDecimal.
         int actualDigits;
@@ -272,18 +275,24 @@ public class DecimalUtility extends CoreDecimalUtility{
         if (sign == true) {
             intermediateBytes[0] = (byte) (intermediateBytes[0] | 0x80);
         }
-        DrillBuf intermediate = data.getAllocator().buffer(intermediateBytes.length);
+
+    final ByteBuf intermediate = UnpooledByteBufAllocator.DEFAULT.buffer(intermediateBytes.length);
+    try {
         intermediate.setBytes(0, intermediateBytes);
 
-        BigDecimal ret = getBigDecimalFromIntermediate(intermediate, 0, nDecimalDigits +
1, scale);
-        intermediate.release();
-        return ret;
+      BigDecimal ret = getBigDecimalFromIntermediate(intermediate, 0, nDecimalDigits + 1,
scale);
+      return ret;
+    } finally {
+      intermediate.release();
+    }
+
     }
 
     /*
      * Function converts the BigDecimal and stores it in out internal sparse representation
      */
-    public static void getSparseFromBigDecimal(BigDecimal input, DrillBuf data, int startIndex,
int scale, int precision, int nDecimalDigits) {
+  public static void getSparseFromBigDecimal(BigDecimal input, ByteBuf data, int startIndex,
int scale, int precision,
+      int nDecimalDigits) {
 
         // Initialize the buffer
         for (int i = 0; i < nDecimalDigits; i++) {

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index 5de0a07..0731975 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -1099,16 +1099,6 @@ public final class BitData {
      * <code>optional bool isLastBatch = 7;</code>
      */
     boolean getIsLastBatch();
-
-    // optional bool isOutOfMemory = 8 [default = false];
-    /**
-     * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-     */
-    boolean hasIsOutOfMemory();
-    /**
-     * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-     */
-    boolean getIsOutOfMemory();
   }
   /**
    * Protobuf type {@code exec.bit.data.FragmentRecordBatch}
@@ -1228,11 +1218,6 @@ public final class BitData {
               isLastBatch_ = input.readBool();
               break;
             }
-            case 64: {
-              bitField0_ |= 0x00000040;
-              isOutOfMemory_ = input.readBool();
-              break;
-            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1407,22 +1392,6 @@ public final class BitData {
       return isLastBatch_;
     }
 
-    // optional bool isOutOfMemory = 8 [default = false];
-    public static final int ISOUTOFMEMORY_FIELD_NUMBER = 8;
-    private boolean isOutOfMemory_;
-    /**
-     * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-     */
-    public boolean hasIsOutOfMemory() {
-      return ((bitField0_ & 0x00000040) == 0x00000040);
-    }
-    /**
-     * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-     */
-    public boolean getIsOutOfMemory() {
-      return isOutOfMemory_;
-    }
-
     private void initFields() {
       queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       receivingMajorFragmentId_ = 0;
@@ -1431,7 +1400,6 @@ public final class BitData {
       sendingMinorFragmentId_ = 0;
       def_ = org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.getDefaultInstance();
       isLastBatch_ = false;
-      isOutOfMemory_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1466,9 +1434,6 @@ public final class BitData {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(7, isLastBatch_);
       }
-      if (((bitField0_ & 0x00000040) == 0x00000040)) {
-        output.writeBool(8, isOutOfMemory_);
-      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1511,10 +1476,6 @@ public final class BitData {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(7, isLastBatch_);
       }
-      if (((bitField0_ & 0x00000040) == 0x00000040)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeBoolSize(8, isOutOfMemory_);
-      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1655,8 +1616,6 @@ public final class BitData {
         bitField0_ = (bitField0_ & ~0x00000020);
         isLastBatch_ = false;
         bitField0_ = (bitField0_ & ~0x00000040);
-        isOutOfMemory_ = false;
-        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -1722,10 +1681,6 @@ public final class BitData {
           to_bitField0_ |= 0x00000020;
         }
         result.isLastBatch_ = isLastBatch_;
-        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
-          to_bitField0_ |= 0x00000040;
-        }
-        result.isOutOfMemory_ = isOutOfMemory_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1770,9 +1725,6 @@ public final class BitData {
         if (other.hasIsLastBatch()) {
           setIsLastBatch(other.getIsLastBatch());
         }
-        if (other.hasIsOutOfMemory()) {
-          setIsOutOfMemory(other.getIsOutOfMemory());
-        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2232,39 +2184,6 @@ public final class BitData {
         return this;
       }
 
-      // optional bool isOutOfMemory = 8 [default = false];
-      private boolean isOutOfMemory_ ;
-      /**
-       * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-       */
-      public boolean hasIsOutOfMemory() {
-        return ((bitField0_ & 0x00000080) == 0x00000080);
-      }
-      /**
-       * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-       */
-      public boolean getIsOutOfMemory() {
-        return isOutOfMemory_;
-      }
-      /**
-       * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-       */
-      public Builder setIsOutOfMemory(boolean value) {
-        bitField0_ |= 0x00000080;
-        isOutOfMemory_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional bool isOutOfMemory = 8 [default = false];</code>
-       */
-      public Builder clearIsOutOfMemory() {
-        bitField0_ = (bitField0_ & ~0x00000080);
-        isOutOfMemory_ = false;
-        onChanged();
-        return this;
-      }
-
       // @@protoc_insertion_point(builder_scope:exec.bit.data.FragmentRecordBatch)
     }
 
@@ -2305,18 +2224,17 @@ public final class BitData {
       "itShared.proto\"]\n\022BitClientHandshake\022\023\n\013" +
       "rpc_version\030\001 \001(\005\0222\n\007channel\030\002 \001(\0162\027.exe" +
       "c.shared.RpcChannel:\010BIT_DATA\")\n\022BitServ" +
-      "erHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\252\002\n\023Fra" +
+      "erHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\214\002\n\023Fra" +
       "gmentRecordBatch\022&\n\010query_id\030\001 \001(\0132\024.exe" +
       "c.shared.QueryId\022#\n\033receiving_major_frag" +
       "ment_id\030\002 \001(\005\022#\n\033receiving_minor_fragmen" +
       "t_id\030\003 \003(\005\022!\n\031sending_major_fragment_id\030",
       "\004 \001(\005\022!\n\031sending_minor_fragment_id\030\005 \001(\005" +
       "\022(\n\003def\030\006 \001(\0132\033.exec.shared.RecordBatchD" +
-      "ef\022\023\n\013isLastBatch\030\007 \001(\010\022\034\n\risOutOfMemory" +
-      "\030\010 \001(\010:\005false*D\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022" +
-      "\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATC" +
-      "H\020\003B(\n\033org.apache.drill.exec.protoB\007BitD" +
-      "ataH\001"
+      "ef\022\023\n\013isLastBatch\030\007 \001(\010*D\n\007RpcType\022\r\n\tHA" +
+      "NDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_R"
+
+      "ECORD_BATCH\020\003B(\n\033org.apache.drill.exec.p" +
+      "rotoB\007BitDataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2340,7 +2258,7 @@ public final class BitData {
           internal_static_exec_bit_data_FragmentRecordBatch_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_FragmentRecordBatch_descriptor,
-              new java.lang.String[] { "QueryId", "ReceivingMajorFragmentId", "ReceivingMinorFragmentId",
"SendingMajorFragmentId", "SendingMinorFragmentId", "Def", "IsLastBatch", "IsOutOfMemory",
});
+              new java.lang.String[] { "QueryId", "ReceivingMajorFragmentId", "ReceivingMinorFragmentId",
"SendingMajorFragmentId", "SendingMinorFragmentId", "Def", "IsLastBatch", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 9803079..5684daf 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -280,8 +280,6 @@ public final class SchemaBitData
 
                 if(message.hasIsLastBatch())
                     output.writeBool(7, message.getIsLastBatch(), false);
-                if(message.hasIsOutOfMemory())
-                    output.writeBool(8, message.getIsOutOfMemory(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.BitData.FragmentRecordBatch
message)
             {
@@ -344,9 +342,6 @@ public final class SchemaBitData
                         case 7:
                             builder.setIsLastBatch(input.readBool());
                             break;
-                        case 8:
-                            builder.setIsOutOfMemory(input.readBool());
-                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -394,7 +389,6 @@ public final class SchemaBitData
                 case 5: return "sendingMinorFragmentId";
                 case 6: return "def";
                 case 7: return "isLastBatch";
-                case 8: return "isOutOfMemory";
                 default: return null;
             }
         }
@@ -413,7 +407,6 @@ public final class SchemaBitData
             fieldMap.put("sendingMinorFragmentId", 5);
             fieldMap.put("def", 6);
             fieldMap.put("isLastBatch", 7);
-            fieldMap.put("isOutOfMemory", 8);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentRecordBatch.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentRecordBatch.java
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentRecordBatch.java
index 61689f3..4b32361 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentRecordBatch.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentRecordBatch.java
@@ -48,7 +48,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
 
     static final FragmentRecordBatch DEFAULT_INSTANCE = new FragmentRecordBatch();
 
-    static final Boolean DEFAULT_IS_OUT_OF_MEMORY = new Boolean(false);
     
     private QueryId queryId;
     private int receivingMajorFragmentId;
@@ -57,7 +56,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
     private int sendingMinorFragmentId;
     private RecordBatchDef def;
     private Boolean isLastBatch;
-    private Boolean isOutOfMemory = DEFAULT_IS_OUT_OF_MEMORY;
 
     public FragmentRecordBatch()
     {
@@ -157,19 +155,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
         return this;
     }
 
-    // isOutOfMemory
-
-    public Boolean getIsOutOfMemory()
-    {
-        return isOutOfMemory;
-    }
-
-    public FragmentRecordBatch setIsOutOfMemory(Boolean isOutOfMemory)
-    {
-        this.isOutOfMemory = isOutOfMemory;
-        return this;
-    }
-
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -249,9 +234,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
                 case 7:
                     message.isLastBatch = input.readBool();
                     break;
-                case 8:
-                    message.isOutOfMemory = input.readBool();
-                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -289,9 +271,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
 
         if(message.isLastBatch != null)
             output.writeBool(7, message.isLastBatch, false);
-
-        if(message.isOutOfMemory != null && message.isOutOfMemory != DEFAULT_IS_OUT_OF_MEMORY)
-            output.writeBool(8, message.isOutOfMemory, false);
     }
 
     public String getFieldName(int number)
@@ -305,7 +284,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
             case 5: return "sendingMinorFragmentId";
             case 6: return "def";
             case 7: return "isLastBatch";
-            case 8: return "isOutOfMemory";
             default: return null;
         }
     }
@@ -326,7 +304,6 @@ public final class FragmentRecordBatch implements Externalizable, Message<Fragme
         __fieldMap.put("sendingMinorFragmentId", 5);
         __fieldMap.put("def", 6);
         __fieldMap.put("isLastBatch", 7);
-        __fieldMap.put("isOutOfMemory", 8);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/protocol/src/main/protobuf/BitData.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 2b76ce0..8724c4f 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -32,5 +32,4 @@ message FragmentRecordBatch{
   optional int32 sending_minor_fragment_id = 5;
   optional exec.shared.RecordBatchDef def = 6;
   optional bool isLastBatch = 7;
-  optional bool isOutOfMemory = 8 [ default = false ];
 }


Mime
View raw message