hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject hbase git commit: HBASE-15773 Improvements to CellCounter job
Date Fri, 06 May 2016 18:18:42 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 0964884b9 -> e44c36035


HBASE-15773 Improvements to CellCounter job


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e44c3603
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e44c3603
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e44c3603

Branch: refs/heads/branch-1
Commit: e44c3603504972d00a3be9a6d6cd296bd5734b87
Parents: 0964884
Author: Gary Helmling <garyh@apache.org>
Authored: Thu May 5 12:40:47 2016 -0700
Committer: Gary Helmling <garyh@apache.org>
Committed: Fri May 6 11:18:24 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/CellCounter.java     | 124 ++++++++++++-------
 .../hbase/mapreduce/TableInputFormat.java       |  88 +++++++------
 2 files changed, 129 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e44c3603/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
index b67932f..09290fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
@@ -88,7 +88,30 @@ public class CellCounter {
      * Counter enumeration to count the actual rows.
      */
     public static enum Counters {
-      ROWS
+      ROWS,
+      CELLS
+    }
+
+    private Configuration conf;
+    private String separator;
+
+    // state of current row, family, column needs to persist across map() invocations
+    // in order to properly handle scanner batching, where a single qualifier may have too
+    // many versions for a single map() call
+    private byte[] lastRow;
+    private String currentRowKey;
+    byte[] currentFamily = null;
+    String currentFamilyName = null;
+    byte[] currentQualifier = null;
+    // family + qualifier
+    String currentQualifierName = null;
+    // rowkey + family + qualifier
+    String currentRowQualifierName = null;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      conf = context.getConfiguration();
+      separator = conf.get("ReportSeparator",":");
     }
 
     /**
@@ -108,48 +131,45 @@ public class CellCounter {
         throws IOException {
       Preconditions.checkState(values != null,
           "values passed to the map is null");
-      String currentFamilyName = null;
-      String currentQualifierName = null;
-      String currentRowKey = null;
-      Configuration config = context.getConfiguration();
-      String separator = config.get("ReportSeparator",":");
-      try {
-        context.getCounter(Counters.ROWS).increment(1);
-        context.write(new Text("Total ROWS"), new IntWritable(1));
 
-        for (Cell value : values.listCells()) {
-          currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
-          String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
-          if (!thisRowFamilyName.equals(currentFamilyName)) {
-            currentFamilyName = thisRowFamilyName;
-            context.getCounter("CF", thisRowFamilyName).increment(1);
-            if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
-              context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
-              context.write(new Text(thisRowFamilyName), new IntWritable(1));
+      try {
+        byte[] currentRow = values.getRow();
+        if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
+          lastRow = currentRow;
+          currentRowKey = Bytes.toStringBinary(currentRow);
+          currentFamily = null;
+          currentQualifier = null;
+          context.getCounter(Counters.ROWS).increment(1);
+          context.write(new Text("Total ROWS"), new IntWritable(1));
+        }
+        if (!values.isEmpty()) {
+          int cellCount = 0;
+          for (Cell value : values.listCells()) {
+            cellCount++;
+            if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily))
{
+              currentFamily = CellUtil.cloneFamily(value);
+              currentFamilyName = Bytes.toStringBinary(currentFamily);
+              currentQualifier = null;
+              context.getCounter("CF", currentFamilyName).increment(1);
+              if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
+                context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
+                context.write(new Text(currentFamily), new IntWritable(1));
+              }
             }
-          }
-          String thisRowQualifierName = thisRowFamilyName + separator
-              + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
-          if (!thisRowQualifierName.equals(currentQualifierName)) {
-            currentQualifierName = thisRowQualifierName;
-            context.getCounter("CFQL", thisRowQualifierName).increment(1);
-            context.write(new Text("Total Qualifiers across all Rows"),
-              new IntWritable(1));
-            context.write(new Text(thisRowQualifierName), new IntWritable(1));
-            // Intialize versions
-            context.getCounter("QL_VERSIONS", currentRowKey + separator +
-              thisRowQualifierName).increment(1);
-            context.write(new Text(currentRowKey + separator
-                + thisRowQualifierName + "_Versions"), new IntWritable(1));
+            if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier))
{
+              currentQualifier = CellUtil.cloneQualifier(value);
+              currentQualifierName = currentFamilyName + separator +
+                  Bytes.toStringBinary(currentQualifier);
+              currentRowQualifierName = currentRowKey + separator + currentQualifierName;
 
-          } else {
+              context.write(new Text("Total Qualifiers across all Rows"),
+                  new IntWritable(1));
+              context.write(new Text(currentQualifierName), new IntWritable(1));
+            }
             // Increment versions
-            currentQualifierName = thisRowQualifierName;
-            context.getCounter("QL_VERSIONS", currentRowKey + separator +
-              thisRowQualifierName).increment(1);
-            context.write(new Text(currentRowKey + separator
-                + thisRowQualifierName + "_Versions"), new IntWritable(1));
+            context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
           }
+          context.getCounter(Counters.CELLS).increment(cellCount);
         }
       } catch (InterruptedException e) {
         e.printStackTrace();
@@ -203,15 +223,16 @@ public class CellCounter {
     return job;
   }
 
-  private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException
{
-    Scan s = new Scan();
+  private static Scan getConfiguredScanForJob(Configuration conf, String[] args)
+      throws IOException {
+    // create scan with any properties set from TableInputFormat
+    Scan s = TableInputFormat.createScanFromConfiguration(conf);
     // Set Scan Versions
-    s.setMaxVersions(Integer.MAX_VALUE);
-    s.setCacheBlocks(false);
-    // Set Scan Column Family
-    if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
-      s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
+    if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
+      // default to all versions unless explicitly set
+      s.setMaxVersions(Integer.MAX_VALUE);
     }
+    s.setCacheBlocks(false);
     // Set RowFilter or Prefix Filter if applicable.
     Filter rowFilter = getRowFilter(args);
     if (rowFilter!= null) {
@@ -278,9 +299,18 @@ public class CellCounter {
       System.err.println("       <tablename> <outputDir> <reportSeparator>
[^[regex pattern] or " +
         "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
       System.err.println("  Note: -D properties will be applied to the conf used. ");
-      System.err.println("  Additionally, the following SCAN properties can be specified");
-      System.err.println("  to get fine grained control on what is counted..");
+      System.err.println("  Additionally, all of the SCAN properties from TableInputFormat");
+      System.err.println("  can be specified to get fine grained control on what is counted..");
+      System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
+      System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
+      System.err.println("   -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
       System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>");
+      System.err.println("   -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>");
+      System.err.println("   -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>");
       System.err.println(" <reportSeparator> parameter can be used to override the
default report separator " +
           "string : used to separate the rowId/column family name and qualifier name.");
       System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the
cell counter count " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/e44c3603/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index ebeb158..be20d90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -126,54 +126,70 @@ implements Configurable {
       }
     } else {
       try {
-        scan = new Scan();
+        scan = createScanFromConfiguration(conf);
+      } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+      }
+    }
 
-        if (conf.get(SCAN_ROW_START) != null) {
-          scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
-        }
+    setScan(scan);
+  }
 
-        if (conf.get(SCAN_ROW_STOP) != null) {
-          scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
-        }
+  /**
+   * Sets up a {@link Scan} instance, applying settings from the configuration property
+   * constants defined in {@code TableInputFormat}.  This allows specifying things such as:
+   * <ul>
+   *   <li>start and stop rows</li>
+   *   <li>column qualifiers or families</li>
+   *   <li>timestamps or timerange</li>
+   *   <li>scanner caching and batch size</li>
+   * </ul>
+   */
+  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
+    Scan scan = new Scan();
 
-        if (conf.get(SCAN_COLUMNS) != null) {
-          addColumns(scan, conf.get(SCAN_COLUMNS));
-        }
+    if (conf.get(SCAN_ROW_START) != null) {
+      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
+    }
 
-        if (conf.get(SCAN_COLUMN_FAMILY) != null) {
-          scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
-        }
+    if (conf.get(SCAN_ROW_STOP) != null) {
+      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
+    }
 
-        if (conf.get(SCAN_TIMESTAMP) != null) {
-          scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
-        }
+    if (conf.get(SCAN_COLUMNS) != null) {
+      addColumns(scan, conf.get(SCAN_COLUMNS));
+    }
 
-        if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END)
!= null) {
-          scan.setTimeRange(
-              Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
-              Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
-        }
+    if (conf.get(SCAN_COLUMN_FAMILY) != null) {
+      scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
+    }
 
-        if (conf.get(SCAN_MAXVERSIONS) != null) {
-          scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
-        }
+    if (conf.get(SCAN_TIMESTAMP) != null) {
+      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
+    }
 
-        if (conf.get(SCAN_CACHEDROWS) != null) {
-          scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
-        }
+    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) !=
null) {
+      scan.setTimeRange(
+          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
+          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
+    }
 
-        if (conf.get(SCAN_BATCHSIZE) != null) {
-          scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
-        }
+    if (conf.get(SCAN_MAXVERSIONS) != null) {
+      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
+    }
 
-        // false by default, full table scans generate too much BC churn
-        scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
-      } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
-      }
+    if (conf.get(SCAN_CACHEDROWS) != null) {
+      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
     }
 
-    setScan(scan);
+    if (conf.get(SCAN_BATCHSIZE) != null) {
+      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
+    }
+
+    // false by default, full table scans generate too much BC churn
+    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
+
+    return scan;
   }
 
   @Override


Mime
View raw message