drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [1/3] drill git commit: DRILL-5080: Memory-managed version of external sort
Date Mon, 13 Feb 2017 03:50:40 GMT
Repository: drill
Updated Branches:
  refs/heads/master c9a6ac4fc -> 300e9349a


http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
new file mode 100644
index 0000000..81856fa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
+
+public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
+
+  private SelectionVector4 vector4;
+  private List<BatchGroup> batchGroups;
+  private VectorAccessible hyperBatch;
+  private VectorAccessible outgoing;
+  private int size;
+  private int queueSize = 0;
+
+  @Override
+  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups,
+                    VectorAccessible outgoing) throws SchemaChangeException {
+    this.hyperBatch = hyperBatch;
+    this.batchGroups = batchGroups;
+    this.outgoing = outgoing;
+    this.size = batchGroups.size();
+
+    @SuppressWarnings("resource")
+    final DrillBuf drillBuf = allocator.buffer(4 * size);
+    vector4 = new SelectionVector4(drillBuf, size, Character.MAX_VALUE);
+    doSetup(context, hyperBatch, outgoing);
+
+    queueSize = 0;
+    for (int i = 0; i < size; i++) {
+      vector4.set(i, i, batchGroups.get(i).getNextIndex());
+      siftUp();
+      queueSize++;
+    }
+  }
+
+  @Override
+  public int next(int targetRecordCount) {
+    allocateVectors(targetRecordCount);
+    for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
+      if (queueSize == 0) {
+        return 0;
+      }
+      int compoundIndex = vector4.get(0);
+      int batch = compoundIndex >>> 16;
+      assert batch < batchGroups.size() : String.format("batch: %d batchGroups: %d", batch,
batchGroups.size());
+      doCopy(compoundIndex, outgoingIndex);
+      int nextIndex = batchGroups.get(batch).getNextIndex();
+      if (nextIndex < 0) {
+        vector4.set(0, vector4.get(--queueSize));
+      } else {
+        vector4.set(0, batch, nextIndex);
+      }
+      if (queueSize == 0) {
+        setValueCount(++outgoingIndex);
+        return outgoingIndex;
+      }
+      siftDown();
+    }
+    setValueCount(targetRecordCount);
+    return targetRecordCount;
+  }
+
+  private void setValueCount(int count) {
+    for (VectorWrapper<?> w: outgoing) {
+      w.getValueVector().getMutator().setValueCount(count);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    vector4.clear();
+    for (final VectorWrapper<?> w: outgoing) {
+      w.getValueVector().clear();
+    }
+    for (final VectorWrapper<?> w : hyperBatch) {
+      w.clear();
+    }
+
+    for (BatchGroup batchGroup : batchGroups) {
+      batchGroup.close();
+    }
+  }
+
+  private void siftUp() {
+    int p = queueSize;
+    while (p > 0) {
+      if (compare(p, (p - 1) / 2) < 0) {
+        swap(p, (p - 1) / 2);
+        p = (p - 1) / 2;
+      } else {
+        break;
+      }
+    }
+  }
+
+  private void allocateVectors(int targetRecordCount) {
+    for (VectorWrapper<?> w: outgoing) {
+      AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
+    }
+  }
+
+  private void siftDown() {
+    int p = 0;
+    int next;
+    while (p * 2 + 1 < queueSize) { // While the current node has at least one child
+      if (p * 2 + 2 >= queueSize) { // if current node has only one child, then we only
look at it
+        next = p * 2 + 1;
+      } else {
+        if (compare(p * 2 + 1, p * 2 + 2) <= 0) {//if current node has two children, we
must first determine which one has higher priority
+          next = p * 2 + 1;
+        } else {
+          next = p * 2 + 2;
+        }
+      }
+      if (compare(p, next) > 0) { // compare current node to highest priority child and
swap if necessary
+        swap(p, next);
+        p = next;
+      } else {
+        break;
+      }
+    }
+  }
+
+  public void swap(int sv0, int sv1) {
+    int tmp = vector4.get(sv0);
+    vector4.set(sv0, vector4.get(sv1));
+    vector4.set(sv1, tmp);
+  }
+
+  public int compare(int leftIndex, int rightIndex) {
+    int sv1 = vector4.get(leftIndex);
+    int sv2 = vector4.get(rightIndex);
+    return doEval(sv1, sv2);
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming")
VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+  public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int
rightIndex);
+  public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index f8b4334..26a77ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.server.options;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -24,25 +26,22 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.drill.common.config.LogicalPlanPersistence;
-import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.ClassCompilerSelector;
 import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.util.AssertionUtil;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
@@ -163,7 +162,8 @@ public class SystemOptionManager extends BaseOptionManager implements
AutoClosea
       ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR,
       ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR,
       ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR,
-      ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR
+      ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR,
+      ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION
     };
     final Map<String, OptionValidator> tmp = new HashMap<>();
     for (final OptionValidator validator : validators) {

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 678167f..d06424e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -68,7 +68,11 @@ public class MemoryAllocationUtilities {
       logger.debug("Max sort alloc: {}", maxSortAlloc);
 
       for(final ExternalSort externalSort : sortList) {
-        externalSort.setMaxAllocation(maxSortAlloc);
+        // Ensure that the sort receives the minimum memory needed to make progress.
+        // Without this, the math might work out to allocate too little memory.
+
+        long alloc = Math.max(maxSortAlloc, externalSort.getInitialAllocation());
+        externalSort.setMaxAllocation(alloc);
       }
     }
     plan.getProperties().hasResourcePlan = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 735ba2f..1c702d7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -103,10 +103,10 @@ drill.exec: {
     }
   },
   zk: {
-  connect: "localhost:2181",
-  root: "drill",
-  refresh: 500,
-  timeout: 5000,
+    connect: "localhost:2181",
+    root: "drill",
+    refresh: 500,
+    timeout: 5000,
     retry: {
       count: 7200,
       delay: 500
@@ -177,13 +177,47 @@ drill.exec: {
   sort: {
     purge.threshold : 1000,
     external: {
-      batch.size : 4000,
+      // Drill uses the managed External Sort Batch by default.
+      // Set this to true to use the legacy, unmanaged version.
+      // Disabled in the intial commit, to be enabled after
+      // tests are committed.
+      disable_managed: true
+      // Limit on the number of batches buffered in memory.
+      // Primarily for testing.
+      // 0 = unlimited
+      batch_limit: 0
+      // Limit on the amount of memory used for xsort. Overrides the
+      // value provided by Foreman. Primarily for testing.
+      // 0 = unlimited, Supports HOCON memory suffixes.
+      mem_limit: 0
+      // Limit on the number of spilled batches that can be merged in
+      // a single pass. Limits the number of open file handles.
+      // 0 = unlimited
+      merge_limit: 0
       spill: {
-        batch.size : 4000,
-        group.size : 40000,
-        threshold : 40000,
-        directories : [ "/tmp/drill/spill" ],
-        fs : "file:///"
+        // Deprecated for managed xsort; used only by legacy xsort
+        group.size: 40000,
+        // Deprecated for managed xsort; used only by legacy xsort
+        threshold: 40000,
+        // File system to use. Local file system by default.
+        fs: "file:///"
+        // List of directories to use. Directories are created
+        // if they do not exist.
+        directories: [ "/tmp/drill/spill" ],
+        // Size of the batches written to, and read from, the spill files.
+        // Determines the ratio of memory to input data size for a single-
+        // generation sort. Smaller values give larger ratios, but at a
+        // (high) cost of much greater disk seek times.
+        spill_batch_size = 8M,
+        // Preferred file size for "first-generation" spill files.
+        // Set large enough to get long, continuous writes, but not so
+        // large as to overwhelm a temp directory.
+        // Supports HOCON memory suffixes.
+        file_size: 256M,
+        // Size of the batch sent downstream from the sort operator during
+        // the merge phase. Don't change this unless you know what you are doing,
+        // larger sizes can result in memory fragmentation.
+        merge_batch_size = 16M
       }
     }
   },

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
index 3e55d9d..52ebd57 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.xsort;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.ExecConstants;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.BufferedOutputStream;
@@ -28,28 +30,37 @@ import java.io.FileOutputStream;
 public class TestExternalSort extends BaseTestQuery {
 
   @Test
-  public void testNumericTypes() throws Exception {
+  public void testNumericTypesManaged() throws Exception {
+    testNumericTypes( false );
+  }
+
+  @Test
+  public void testNumericTypesLegacy() throws Exception {
+    testNumericTypes( true );
+  }
+
+  private void testNumericTypes(boolean testLegacy) throws Exception {
     final int record_count = 10000;
     String dfs_temp = getDfsTestTmpSchemaLocation();
     System.out.println(dfs_temp);
     File table_dir = new File(dfs_temp, "numericTypes");
     table_dir.mkdir();
-    BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"a.json")));
-    String format = "{ a : %d }%n";
-    for (int i = 0; i <= record_count; i += 2) {
-      os.write(String.format(format, i).getBytes());
+    try(BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"a.json")))) {
+      String format = "{ a : %d }%n";
+      for (int i = 0; i <= record_count; i += 2) {
+        os.write(String.format(format, i).getBytes());
+      }
     }
-    os.close();
-    os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "b.json")));
-    format = "{ a : %.2f }%n";
-    for (int i = 1; i <= record_count; i+=2) {
-      os.write(String.format(format, (float) i).getBytes());
+    try(BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"b.json")))) {
+      String format = "{ a : %.2f }%n";
+      for (int i = 1; i <= record_count; i+=2) {
+        os.write(String.format(format, (float) i).getBytes());
+      }
     }
-    os.close();
     String query = "select * from dfs_test.tmp.numericTypes order by a desc";
     TestBuilder builder = testBuilder()
             .sqlQuery(query)
-            .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type`
= true")
+            .optionSettingQueriesForTestQuery(getOptions(testLegacy))
             .ordered()
             .baselineColumns("a");
     for (int i = record_count; i >= 0;) {
@@ -61,30 +72,48 @@ public class TestExternalSort extends BaseTestQuery {
     builder.go();
   }
 
+  private String getOptions(boolean testLegacy) {
+    String options = "alter session set `exec.enable_union_type` = true";
+    options += ";alter session set `" + ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName()
+ "` = " +
+        Boolean.toString(testLegacy);
+    return options;
+  }
+
+  @Test
+  @Ignore("Schema changes are disabled in external sort")
+  public void testNumericAndStringTypesManaged() throws Exception {
+    testNumericAndStringTypes(false);
+  }
+
   @Test
-  public void testNumericAndStringTypes() throws Exception {
+  @Ignore("Schema changes are disabled in external sort")
+  public void testNumericAndStringTypesLegacy() throws Exception {
+    testNumericAndStringTypes(true);
+  }
+
+  private void testNumericAndStringTypes(boolean testLegacy) throws Exception {
     final int record_count = 10000;
     String dfs_temp = getDfsTestTmpSchemaLocation();
     System.out.println(dfs_temp);
     File table_dir = new File(dfs_temp, "numericAndStringTypes");
     table_dir.mkdir();
-    BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"a.json")));
-    String format = "{ a : %d }%n";
-    for (int i = 0; i <= record_count; i += 2) {
-      os.write(String.format(format, i).getBytes());
+    try (BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"a.json")))) {
+      String format = "{ a : %d }%n";
+      for (int i = 0; i <= record_count; i += 2) {
+        os.write(String.format(format, i).getBytes());
+      }
     }
-    os.close();
-    os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "b.json")));
-    format = "{ a : \"%05d\" }%n";
-    for (int i = 1; i <= record_count; i+=2) {
-      os.write(String.format(format, i).getBytes());
+    try (BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"b.json")))) {
+      String format = "{ a : \"%05d\" }%n";
+      for (int i = 1; i <= record_count; i+=2) {
+        os.write(String.format(format, i).getBytes());
+      }
     }
-    os.close();
     String query = "select * from dfs_test.tmp.numericAndStringTypes order by a desc";
     TestBuilder builder = testBuilder()
             .sqlQuery(query)
             .ordered()
-            .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type`
= true")
+            .optionSettingQueriesForTestQuery(getOptions(testLegacy))
             .baselineColumns("a");
     // Strings come first because order by is desc
     for (int i = record_count; i >= 0;) {
@@ -101,30 +130,40 @@ public class TestExternalSort extends BaseTestQuery {
   }
 
   @Test
-  public void testNewColumns() throws Exception {
+  public void testNewColumnsManaged() throws Exception {
+    testNewColumns(false);
+  }
+
+
+  @Test
+  public void testNewColumnsLegacy() throws Exception {
+    testNewColumns(true);
+  }
+
+  private void testNewColumns(boolean testLegacy) throws Exception {
     final int record_count = 10000;
     String dfs_temp = getDfsTestTmpSchemaLocation();
     System.out.println(dfs_temp);
     File table_dir = new File(dfs_temp, "newColumns");
     table_dir.mkdir();
-    BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"a.json")));
-    String format = "{ a : %d, b : %d }%n";
-    for (int i = 0; i <= record_count; i += 2) {
-      os.write(String.format(format, i, i).getBytes());
+    try (BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"a.json")))) {
+      String format = "{ a : %d, b : %d }%n";
+      for (int i = 0; i <= record_count; i += 2) {
+        os.write(String.format(format, i, i).getBytes());
+      }
     }
-    os.close();
-    os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "b.json")));
-    format = "{ a : %d, c : %d }%n";
-    for (int i = 1; i <= record_count; i+=2) {
-      os.write(String.format(format, i, i).getBytes());
+    try (BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir,
"b.json")))) {
+      String format = "{ a : %d, c : %d }%n";
+      for (int i = 1; i <= record_count; i+=2) {
+        os.write(String.format(format, i, i).getBytes());
+      }
     }
-    os.close();
     String query = "select a, b, c from dfs_test.tmp.newColumns order by a desc";
 //    Test framework currently doesn't handle changing schema (i.e. new columns) on the client
side
     TestBuilder builder = testBuilder()
             .sqlQuery(query)
             .ordered()
-            .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type`
= true")
+            .optionSettingQueriesForTestQuery(getOptions(testLegacy))
             .baselineColumns("a", "b", "c");
     for (int i = record_count; i >= 0;) {
       builder.baselineValues((long) i, (long) i--, null);

http://git-wip-us.apache.org/repos/asf/drill/blob/300e9349/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 8c7e7ca..1245e86 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -40,6 +40,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator";
 
+  @SuppressWarnings("unused")
   private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
   private static final int CHUNK_SIZE = AllocationManager.INNER_ALLOCATOR.getChunkSize();
 
@@ -98,9 +99,9 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       historicalLog = null;
       childLedgers = null;
     }
-
   }
 
+  @Override
   public void assertOpen() {
     if (AssertionUtil.ASSERT_ENABLED) {
       if (isClosed) {
@@ -289,6 +290,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       }
     }
 
+    @Override
     public boolean add(final int nBytes) {
       assertOpen();
 
@@ -310,6 +312,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       return true;
     }
 
+    @Override
     public DrillBuf allocateBuffer() {
       assertOpen();
 
@@ -321,14 +324,17 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       return drillBuf;
     }
 
+    @Override
     public int getSize() {
       return nBytes;
     }
 
+    @Override
     public boolean isUsed() {
       return used;
     }
 
+    @Override
     public boolean isClosed() {
       return closed;
     }
@@ -366,6 +372,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       closed = true;
     }
 
+    @Override
     public boolean reserve(int nBytes) {
       assertOpen();
 
@@ -511,6 +518,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   }
 
+  @Override
   public String toString() {
     final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
         : Verbosity.BASIC;
@@ -525,6 +533,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
    *
    * @return A Verbose string of current allocator state.
    */
+  @Override
   public String toVerboseString() {
     final StringBuilder sb = new StringBuilder();
     print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
@@ -542,7 +551,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
    *          An integer value.
    * @return The closest power of two of that value.
    */
-  static int nextPowerOfTwo(int val) {
+  public static int nextPowerOfTwo(int val) {
     int highestBit = Integer.highestOneBit(val);
     if (highestBit == val) {
       return val;


Mime
View raw message