drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [4/6] drill git commit: DRILL-5516: Limit memory usage for Hbase reader
Date Sat, 20 May 2017 14:26:03 GMT
DRILL-5516: Limit memory usage for Hbase reader

close apache/drill#839


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7f98400f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7f98400f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7f98400f

Branch: refs/heads/master
Commit: 7f98400f949b04eb06415aeb3a3265693629371c
Parents: aa39c66
Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
Authored: Mon May 15 15:51:02 2017 +0000
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Fri May 19 10:05:26 2017 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 41 ++++++++++++++++----
 1 file changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7f98400f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index b3a7039..3f308ce 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -59,6 +59,10 @@ import com.google.common.collect.Sets;
 public class HBaseRecordReader extends AbstractRecordReader implements DrillHBaseConstants
{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
+  // batch should not exceed this value to avoid OOM on a busy system
+  private static final int MAX_ALLOCATED_MEMORY_PER_BATCH = 64 * 1024 * 1024; // 64 mb in
bytes
+
+  // batch size should not exceed max allowed record count
   private static final int TARGET_RECORD_COUNT = 4000;
 
   private OutputMutator outputMutator;
@@ -134,7 +138,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{
     this.operatorContext = context;
     this.outputMutator = output;
-    familyVectorMap = new HashMap<String, MapVector>();
+    familyVectorMap = new HashMap<>();
 
     try {
       hTable = connection.getTable(hbaseTableName);
@@ -187,8 +191,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
     }
 
     int rowCount = 0;
-    done:
-    for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
+    // if allocated memory for the first row is larger than allowed max in batch, it will
be added anyway
+    do {
       Result result = null;
       final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
       try {
@@ -206,13 +210,17 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
         throw new DrillRuntimeException(e);
       }
       if (result == null) {
-        break done;
+        break;
       }
 
       // parse the result and populate the value vectors
       Cell[] cells = result.rawCells();
       if (rowKeyVector != null) {
-        rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(),
cells[0].getRowLength());
+        rowKeyVector.getMutator().setSafe(
+            rowCount,
+            cells[0].getRowArray(),
+            cells[0].getRowOffset(),
+            cells[0].getRowLength());
       }
       if (!rowKeyOnly) {
         for (final Cell cell : cells) {
@@ -224,7 +232,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
           final int qualifierOffset = cell.getQualifierOffset();
           final int qualifierLength = cell.getQualifierLength();
           final byte[] qualifierArray = cell.getQualifierArray();
-          final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray,
qualifierOffset, qualifierLength));
+          final NullableVarBinaryVector v = getOrCreateColumnVector(mv,
+              new String(qualifierArray, qualifierOffset, qualifierLength));
 
           final int valueOffset = cell.getValueOffset();
           final int valueLength = cell.getValueLength();
@@ -232,7 +241,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
           v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength);
         }
       }
-    }
+      rowCount++;
+    } while (canAddNewRow(rowCount));
 
     setOutputRowCount(rowCount);
     logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
@@ -289,4 +299,19 @@ public class HBaseRecordReader extends AbstractRecordReader implements
DrillHBas
       rowKeyVector.getMutator().setValueCount(count);
     }
   }
+
+  /**
+   * Checks if new row can be added in batch. Row can be added if:
+   * <ul>
+   *   <li>current row count does not exceed max allowed one</li>
+   *   <li>allocated memory does not exceed max allowed one</li>
+   * </ul>
+   *
+   * @param rowCount current row count
+   * @return true if new row can be added in batch, false otherwise
+   */
+  private boolean canAddNewRow(int rowCount) {
+    return rowCount < TARGET_RECORD_COUNT &&
+        operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
+  }
 }


Mime
View raw message