hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1360171 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
Date Wed, 11 Jul 2012 13:39:29 GMT
Author: tedyu
Date: Wed Jul 11 13:39:29 2012
New Revision: 1360171

URL: http://svn.apache.org/viewvc?rev=1360171&view=rev
Log:
HBASE-6369 HTable is not closed in AggregationClient (binlijin)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1360171&r1=1360170&r2=1360171&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
Wed Jul 11 13:39:29 2012
@@ -94,8 +94,6 @@ public class AggregationClient {
   public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
       final Scan scan) throws Throwable {
     validateParameters(scan);
-    HTable table = new HTable(conf, tableName);
-
     class MaxCallBack implements Batch.Callback<R> {
       R max = null;
 
@@ -109,13 +107,21 @@ public class AggregationClient {
       }
     }
     MaxCallBack aMaxCallBack = new MaxCallBack();
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(), new Batch.Call<AggregateProtocol, R>() {
-      @Override
-      public R call(AggregateProtocol instance) throws IOException {
-        return instance.getMax(ci, scan);
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
+            @Override
+            public R call(AggregateProtocol instance) throws IOException {
+              return instance.getMax(ci, scan);
+            }
+          }, aMaxCallBack);
+    } finally {
+      if (table != null) {
+        table.close();
       }
-    }, aMaxCallBack);
+    }
     return aMaxCallBack.getMax();
   }
 
@@ -158,16 +164,23 @@ public class AggregationClient {
         min = (min == null || (result != null && ci.compare(result, min) < 0))
? result : min;
       }
     }
-    HTable table = new HTable(conf, tableName);
     MinCallBack minCallBack = new MinCallBack();
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(), new Batch.Call<AggregateProtocol, R>() {
-
-      @Override
-      public R call(AggregateProtocol instance) throws IOException {
-        return instance.getMin(ci, scan);
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
+
+            @Override
+            public R call(AggregateProtocol instance) throws IOException {
+              return instance.getMin(ci, scan);
+            }
+          }, minCallBack);
+    } finally {
+      if (table != null) {
+        table.close();
       }
-    }, minCallBack);
+    }
     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
     return minCallBack.getMinimum();
   }
@@ -201,14 +214,21 @@ public class AggregationClient {
       }
     }
     RowNumCallback rowNum = new RowNumCallback();
-    HTable table = new HTable(conf, tableName);
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
-      @Override
-      public Long call(AggregateProtocol instance) throws IOException {
-        return instance.getRowNum(ci, scan);
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
+            @Override
+            public Long call(AggregateProtocol instance) throws IOException {
+              return instance.getRowNum(ci, scan);
+            }
+          }, rowNum);
+    } finally {
+      if (table != null) {
+        table.close();
       }
-    }, rowNum);
+    }
     return rowNum.getRowNumCount();
   }
 
@@ -237,14 +257,21 @@ public class AggregationClient {
       }
     }
     SumCallBack sumCallBack = new SumCallBack();
-    HTable table = new HTable(conf, tableName);
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(), new Batch.Call<AggregateProtocol, S>() {
-      @Override
-      public S call(AggregateProtocol instance) throws IOException {
-        return instance.getSum(ci, scan);
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() {
+            @Override
+            public S call(AggregateProtocol instance) throws IOException {
+              return instance.getSum(ci, scan);
+            }
+          }, sumCallBack);
+    } finally {
+      if (table != null) {
+        table.close();
       }
-    }, sumCallBack);
+    }
     return sumCallBack.getSumResult();
   }
 
@@ -274,14 +301,23 @@ public class AggregationClient {
       }
     }
     AvgCallBack avgCallBack = new AvgCallBack();
-    HTable table = new HTable(conf, tableName);
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
-      @Override
-      public Pair<S, Long> call(AggregateProtocol instance) throws IOException {
-        return instance.getAvg(ci, scan);
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(),
+          new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
+            @Override
+            public Pair<S, Long> call(AggregateProtocol instance)
+                throws IOException {
+              return instance.getAvg(ci, scan);
+            }
+          }, avgCallBack);
+    } finally {
+      if (table != null) {
+        table.close();
       }
-    }, avgCallBack);
+    }
     return avgCallBack.getAvgArgs();
   }
 
@@ -337,17 +373,24 @@ public class AggregationClient {
       }
     }
     StdCallback stdCallback = new StdCallback();
-    HTable table = new HTable(conf, tableName);
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(),
-        new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
-          @Override
-          public Pair<List<S>, Long> call(AggregateProtocol instance)
-              throws IOException {
-            return instance.getStd(ci, scan);
-          }
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(),
+          new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
+            @Override
+            public Pair<List<S>, Long> call(AggregateProtocol instance)
+                throws IOException {
+              return instance.getStd(ci, scan);
+            }
 
-        }, stdCallback);
+          }, stdCallback);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
     return stdCallback.getStdParams();
   }
 
@@ -412,17 +455,22 @@ public class AggregationClient {
       }
     }
     StdCallback stdCallback = new StdCallback();
-    HTable table = new HTable(conf, tableName);
-    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
-        .getStopRow(),
-        new Batch.Call<AggregateProtocol, List<S>>() {
-          @Override
-          public List<S> call(AggregateProtocol instance)
-              throws IOException {
-            return instance.getMedian(ci, scan);
-          }
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+      table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
+          scan.getStopRow(), new Batch.Call<AggregateProtocol, List<S>>() {
+            @Override
+            public List<S> call(AggregateProtocol instance) throws IOException {
+              return instance.getMedian(ci, scan);
+            }
 
-        }, stdCallback);
+          }, stdCallback);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
     return stdCallback.getMedianParams();
   }
 
@@ -464,20 +512,22 @@ public class AggregationClient {
     Scan scan2 = new Scan(scan);
     // inherit stop row from method parameter
     if (startRow != null) scan2.setStartRow(startRow);
-    HTable table = new HTable(conf, tableName);
-    int cacheSize = scan2.getCaching();
-    if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
-      scan2.setCacheBlocks(true);
-      cacheSize = 5;
-      scan2.setCaching(cacheSize);
-    }
-    ResultScanner scanner = table.getScanner(scan2);
-    Result[] results = null;
-    byte[] qualifier = quals.pollFirst();
-    // qualifier for the weight column
-    byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
-    R value = null;
+    HTable table = null;
+    ResultScanner scanner = null;
     try {
+      table = new HTable(conf, tableName);
+      int cacheSize = scan2.getCaching();
+      if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
+        scan2.setCacheBlocks(true);
+        cacheSize = 5;
+        scan2.setCaching(cacheSize);
+      }
+      scanner = table.getScanner(scan2);
+      Result[] results = null;
+      byte[] qualifier = quals.pollFirst();
+      // qualifier for the weight column
+      byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
+      R value = null;
       do {
         results = scanner.next(cacheSize);
         if (results != null && results.length > 0) {
@@ -495,11 +545,16 @@ public class AggregationClient {
             movingSumVal = newSumVal;
             kv = r.getColumnLatest(colFamily, qualifier);
             value = ci.getValue(colFamily, qualifier, kv);
+            }
           }
-        }
       } while (results != null && results.length > 0);
     } finally {
-      scanner.close();
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
     }
     return null;
   }



Mime
View raw message