hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r833952 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/mapreduce/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/mapreduce/
Date Mon, 09 Nov 2009 00:41:16 GMT
Author: stack
Date: Mon Nov  9 00:41:15 2009
New Revision: 833952

URL: http://svn.apache.org/viewvc?rev=833952&view=rev
Log:
HBASE-1829 Make use of start/stop row in TableInputFormat

Added:
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=833952&r1=833951&r2=833952&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Nov  9 00:41:15 2009
@@ -167,6 +167,8 @@
    HBASE-1945  Remove META and ROOT memcache size bandaid 
    HBASE-1947  If HBase starts/stops often in less than 24 hours, 
                you end up with lots of store files
+   HBASE-1829  Make use of start/stop row in TableInputFormat
+               (Lars George via Stack)
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java?rev=833952&r1=833951&r2=833952&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java Mon Nov  9
00:41:15 2009
@@ -111,6 +111,11 @@
   throws IOException;
   
   /**
+   * Allows flushing the region cache.
+   */
+  public void clearRegionCache(); 
+  
+  /**
    * Find the location of the region of <i>tableName</i> that <i>row</i>
    * lives in, ignoring any value that might be in the cache.
    * @param tableName name of the table <i>row</i> is in

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=833952&r1=833951&r2=833952&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon
Nov  9 00:41:15 2009
@@ -836,6 +836,13 @@
       return result;
     }
 
+    /**
+     * Allows flushing the region cache.
+     */
+    public void clearRegionCache() {
+     cachedRegionLocations.clear();  
+    }
+    
     /*
      * Put a newly discovered HRegionLocation into the cache.
      */

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=833952&r1=833951&r2=833952&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Mon Nov  9 00:41:15 2009
@@ -20,17 +20,18 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -269,29 +270,40 @@
    */
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
+    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+    if (keys == null || keys.getFirst() == null || 
+        keys.getFirst().length == 0) {
+      throw new IOException("Expecting at least one region.");
+    }
     if (table == null) {
       throw new IOException("No table was provided.");
     }
-    byte [][] startKeys = table.getStartKeys();
-    if (startKeys == null || startKeys.length == 0) {
-      throw new IOException("Expecting at least one region.");
-    }
-    int realNumSplits = startKeys.length;
-    InputSplit[] splits = new InputSplit[realNumSplits];
-    int middle = startKeys.length / realNumSplits;
-    int startPos = 0;
-    for (int i = 0; i < realNumSplits; i++) {
-      int lastPos = startPos + middle;
-      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
-      String regionLocation = table.getRegionLocation(startKeys[startPos]).
-        getServerAddress().getHostname(); 
-      splits[i] = new TableSplit(this.table.getTableName(),
-        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
-          HConstants.EMPTY_START_ROW, regionLocation);
-      LOG.info("split: " + i + "->" + splits[i]);
-      startPos = lastPos;
+    int count = 0;
+    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);

+    for (int i = 0; i < keys.getFirst().length; i++) {
+      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
+        getServerAddress().getHostname();
+      byte[] startRow = scan.getStartRow();
+      byte[] stopRow = scan.getStopRow();
+      // determine if the given start an stop key fall into the region
+      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+          (stopRow.length == 0 || 
+           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+        byte[] splitStart = startRow.length == 0 || 
+          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? 
+            keys.getFirst()[i] : startRow;
+        byte[] splitStop = stopRow.length == 0 || 
+          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0 ? 
+            keys.getSecond()[i] : stopRow;
+        InputSplit split = new TableSplit(table.getTableName(),
+          splitStart, splitStop, regionLocation);
+        splits.add(split);
+        if (LOG.isDebugEnabled()) 
+          LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+      }
     }
-    return Arrays.asList(splits);
+    return splits;
   }
 
   /**

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=833952&r1=833951&r2=833952&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java Mon Nov 
9 00:41:15 2009
@@ -21,19 +21,32 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Jdk14Logger;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import com.sun.corba.se.pept.transport.Connection;
 
 /**
  * Facility for testing HBase. Added as tool to abet junit4 testing.  Replaces
@@ -45,11 +58,14 @@
  * make changes to configuration parameters.
  */
 public class HBaseTestingUtility {
+  
   private final Log LOG = LogFactory.getLog(getClass());
+  
   private final HBaseConfiguration conf = new HBaseConfiguration();
   private MiniZooKeeperCluster zkCluster = null;
   private MiniDFSCluster dfsCluster = null;
   private MiniHBaseCluster hbaseCluster = null;
+  private MiniMRCluster mrCluster = null;
   private File clusterTestBuildDir = null;
 
   /** System property key to get test directory value.
@@ -185,7 +201,7 @@
   }
 
   /**
-   * Flusheds all caches in the mini hbase cluster
+   * Flushes all caches in the mini hbase cluster
    * @throws IOException
    */
   public void flush() throws IOException {
@@ -200,7 +216,7 @@
    * @return An HTable instance for the created table.
    * @throws IOException
    */
-  public HTable createTable(byte [] tableName, byte [] family) 
+  public HTable createTable(byte[] tableName, byte[] family) 
   throws IOException{
     return createTable(tableName, new byte[][]{family});
   }
@@ -212,10 +228,10 @@
    * @return An HTable instance for the created table.
    * @throws IOException
    */
-  public HTable createTable(byte [] tableName, byte [][] families) 
+  public HTable createTable(byte[] tableName, byte[][] families) 
   throws IOException {
     HTableDescriptor desc = new HTableDescriptor(tableName);
-    for(byte [] family : families) {
+    for(byte[] family : families) {
       desc.addFamily(new HColumnDescriptor(family));
     }
     (new HBaseAdmin(getConfiguration())).createTable(desc);
@@ -230,7 +246,7 @@
    * @return An HTable instance for the created table.
    * @throws IOException
    */
-  public HTable createTable(byte [] tableName, byte [] family, int numVersions)
+  public HTable createTable(byte[] tableName, byte[] family, int numVersions)
   throws IOException {
     return createTable(tableName, new byte[][]{family}, numVersions);
   }
@@ -243,11 +259,11 @@
    * @return An HTable instance for the created table.
    * @throws IOException
    */
-  public HTable createTable(byte [] tableName, byte [][] families,
+  public HTable createTable(byte[] tableName, byte[][] families,
       int numVersions)
   throws IOException {
     HTableDescriptor desc = new HTableDescriptor(tableName);
-    for (byte [] family : families) {
+    for (byte[] family : families) {
       HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
           HColumnDescriptor.DEFAULT_COMPRESSION,
           HColumnDescriptor.DEFAULT_IN_MEMORY,
@@ -267,12 +283,12 @@
    * @return An HTable instance for the created table.
    * @throws IOException
    */
-  public HTable createTable(byte [] tableName, byte [][] families,
-      int [] numVersions)
+  public HTable createTable(byte[] tableName, byte[][] families,
+      int[] numVersions)
   throws IOException {
     HTableDescriptor desc = new HTableDescriptor(tableName);
     int i = 0;
-    for (byte [] family : families) {
+    for (byte[] family : families) {
       HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions[i],
           HColumnDescriptor.DEFAULT_COMPRESSION,
           HColumnDescriptor.DEFAULT_IN_MEMORY,
@@ -292,17 +308,17 @@
    * @return Count of rows loaded.
    * @throws IOException
    */
-  public int loadTable(final HTable t, final byte [] f) throws IOException {
+  public int loadTable(final HTable t, final byte[] f) throws IOException {
     byte[] k = new byte[3];
     int rowCount = 0;
-    for (byte b1 = 'a'; b1 < 'z'; b1++) {
-      for (byte b2 = 'a'; b2 < 'z'; b2++) {
-        for (byte b3 = 'a'; b3 < 'z'; b3++) {
+    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
           k[0] = b1;
           k[1] = b2;
           k[2] = b3;
           Put put = new Put(k);
-          put.add(f, new byte[0], k);
+          put.add(f, null, k);
           t.put(put);
           rowCount++;
         }
@@ -310,4 +326,149 @@
     }
     return rowCount;
   }
+  
+  /**
+   * Creates many regions names "aaa" to "zzz".
+   * 
+   * @param table  The table to use for the data.
+   * @param columnFamily  The family to insert the data into.
+   * @throws IOException When creating the regions fails.
+   */
+  public void createMultiRegions(HTable table, byte[] columnFamily) 
+  throws IOException {
+    byte[][] KEYS = {
+      HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
+      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), 
+      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
+      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
+      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
+      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
+      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
+      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
+      Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
+    };
+
+    HBaseConfiguration c = getConfiguration();
+    HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
+    HTableDescriptor htd = table.getTableDescriptor();
+    if(!htd.hasFamily(columnFamily)) {
+      HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
+      htd.addFamily(hcd);
+    }
+    // remove empty region - this is tricky as the mini cluster during the test
+    // setup already has the "<tablename>,,123456789" row with an empty start 
+    // and end key. Adding the custom regions below adds those blindly, 
+    // including the new start region from empty to "bbb". lg 
+    List<byte[]> rows = getMetaTableRows();
+    // add custom ones
+    for (int i = 0; i < KEYS.length; i++) {
+      int j = (i + 1) % KEYS.length;
+      HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), 
+        KEYS[i], KEYS[j]);
+      Put put = new Put(hri.getRegionName());
+      put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, 
+        Writables.getBytes(hri));
+      meta.put(put);
+      LOG.info("createMultiRegions: inserted " + hri.toString());
+    }
+    // see comment above, remove "old" (or previous) single region
+    for (byte[] row : rows) {
+      LOG.info("createMultiRegions: deleting meta row -> " + 
+        Bytes.toStringBinary(row));
+      meta.delete(new Delete(row));
+    }
+    // flush cache of regions
+    HBaseAdmin admin = new HBaseAdmin(getConfiguration());
+    HConnection conn = admin.getConnection();
+    conn.clearRegionCache();
+  }
+
+  /**
+   * Returns all rows from the .META. table.
+   *
+   * @throws IOException When reading the rows fails.
+   */
+  public List<byte[]> getMetaTableRows() throws IOException {
+    HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME);
+    List<byte[]> rows = new ArrayList<byte[]>();
+    ResultScanner s = t.getScanner(new Scan());
+    for (Result result : s) {
+      LOG.info("getMetaTableRows: row -> " + 
+        Bytes.toStringBinary(result.getRow()));
+      rows.add(result.getRow());
+    }
+    s.close();
+    return rows;
+  }
+
+  /**
+   * Removes all rows from the .META. in preparation to add custom ones.
+   *
+   * @throws IOException When removing the rows fails.
+   */
+  private void emptyMetaTable() throws IOException {
+    HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME);
+    ArrayList<Delete> deletes = new ArrayList<Delete>();
+    ResultScanner s = t.getScanner(new Scan());
+    for (Result result : s) {
+      LOG.info("emptyMetaTable: remove row -> " + 
+        Bytes.toStringBinary(result.getRow()));
+      Delete del = new Delete(result.getRow());
+      deletes.add(del);
+    }
+    s.close();
+    t.delete(deletes);
+  }
+  
+  /**
+   * Starts a <code>MiniMRCluster</code> with a default number of 
+   * <code>TaskTracker</code>'s.
+   *
+   * @throws IOException When starting the cluster fails.
+   */
+  public void startMiniMapReduceCluster() throws IOException {
+    startMiniMapReduceCluster(2);
+  }
+  
+  /**
+   * Starts a <code>MiniMRCluster</code>.
+   *
+   * @param servers  The number of <code>TaskTracker</code>'s to start.
+   * @throws IOException When starting the cluster fails.
+   */
+  public void startMiniMapReduceCluster(final int servers) throws IOException {
+    LOG.info("Starting mini mapreduce cluster...");
+    // These are needed for the new and improved Map/Reduce framework
+    Configuration c = getConfiguration();
+    System.setProperty("hadoop.log.dir", c.get("hadoop.log.dir"));
+    c.set("mapred.output.dir", c.get("hadoop.tmp.dir"));
+    mrCluster = new MiniMRCluster(servers, 
+      FileSystem.get(c).getUri().toString(), 1);
+    LOG.info("Mini mapreduce cluster started");
+  }
+  
+  /**
+   * Stops the previously started <code>MiniMRCluster</code>. 
+   */
+  public void shutdownMiniMapReduceCluster() {
+    LOG.info("Stopping mini mapreduce cluster...");
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+    LOG.info("Mini mapreduce cluster stopped");
+  }
+
+  /**
+   * Switches the logger for the given class to DEBUG level.
+   *
+   * @param clazz  The class for which to switch to debug logging.
+   */
+  public void enableDebug(Class<?> clazz) {
+    Log l = LogFactory.getLog(clazz);
+    if (l instanceof Log4JLogger) {
+      ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
+    } else if (l instanceof Jdk14Logger) {
+      ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
+    }
+  }
 }
\ No newline at end of file

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java?rev=833952&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java
(added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java
Mon Nov  9 00:41:15 2009
@@ -0,0 +1,361 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too. 
+ */
+public class TestTableInputFormatScan {
+   
+  static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final String KEY_STARTROW = "startRow";
+  static final String KEY_LASTROW = "stpRow";
+  
+  private static HTable table = null;
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // switch TIF to log at DEBUG level
+    TEST_UTIL.enableDebug(TableInputFormat.class);
+    TEST_UTIL.enableDebug(TableInputFormatBase.class);
+    // start mini hbase cluster
+    TEST_UTIL.startMiniCluster(3);
+    // create and fill table
+    table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
+    TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+    TEST_UTIL.loadTable(table, INPUT_FAMILY);
+    // start MR cluster
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    // nothing
+  }
+  
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    Configuration c = TEST_UTIL.getConfiguration();
+    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+  }
+
+  /**
+   * Pass the key and value to reduce.
+   */
+  public static class ScanMapper
+  extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+  
+    /**
+     * Pass the key and value to reduce.
+     * 
+     * @param key  The key, here "aaa", "aab" etc. 
+     * @param value  The value is the same as the key.
+     * @param context  The task context.
+     * @throws IOException When reading the rows fails.
+     */
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+      Context context) 
+    throws IOException, InterruptedException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 
+        cf = value.getMap();
+      if(!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" + 
+          Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + 
+        ", value -> " + val);
+      context.write(key, key);
+    }
+    
+  }
+  
+  /**
+   * Checks the last and first key seen against the scanner boundaries.
+   */
+  public static class ScanReducer 
+  extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 
+                  NullWritable, NullWritable> {
+    
+    private String first = null;
+    private String last = null;
+    
+    protected void reduce(ImmutableBytesWritable key, 
+        Iterable<ImmutableBytesWritable> values, Context context) 
+    throws IOException ,InterruptedException {
+      int count = 0;
+      for (ImmutableBytesWritable value : values) {
+        String val = Bytes.toStringBinary(value.get());
+        LOG.info("reduce: key[" + count + "] -> " + 
+          Bytes.toStringBinary(key.get()) + ", value -> " + val);
+        if (first == null) first = val;
+        last = val;
+        count++;
+      }
+    }
+    
+    protected void cleanup(Context context) 
+    throws IOException, InterruptedException {
+      Configuration c = context.getConfiguration();
+      String startRow = c.get(KEY_STARTROW);    
+      String lastRow = c.get(KEY_LASTROW);
+      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow +
"\"");
+      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
+      if (startRow != null && startRow.length() > 0) { 
+        assertEquals(startRow, first);
+      }
+      if (lastRow != null && lastRow.length() > 0) { 
+        assertEquals(lastRow, last);
+      }
+    }
+    
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanEmptyToEmpty() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, null, null);
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanEmptyToAPP() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "app", "apo");
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanEmptyToBBA() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "bba", "baz");
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanEmptyToBBB() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "bbb", "bba");
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanEmptyToOPP() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan(null, "opp", "opo");
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanOBBToOPP() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("obb", "opp", "opo");
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanOBBToQPP() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("obb", "qpp", "qpo");
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanOPPToEmpty() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("opp", null, null);
+  }
+  
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanYYXToEmpty() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("yyx", null, null);
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanYYYToEmpty() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("yyy", null, null);
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testScanYZYToEmpty() 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("yzy", null, null);
+  }
+
+  /**
+   * Tests a MR scan using specific start and stop rows.
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
+   */
+  @SuppressWarnings("deprecation")
+  private void testScan(String start, String stop, String last) 
+  throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
+    "To" + (stop != null ? stop.toUpperCase() : "Empty");
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = TEST_UTIL.getConfiguration();
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    if (start != null) {
+      scan.setStartRow(Bytes.toBytes(start));
+    }
+    c.set(KEY_STARTROW, start != null ? start : "");
+    if (stop != null) {
+      scan.setStopRow(Bytes.toBytes(stop));
+    }
+    c.set(KEY_LASTROW, last != null ? last : "");
+    LOG.info("scan before: " + scan);
+    Job job = new Job(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(
+      Bytes.toString(TABLE_NAME), scan, ScanMapper.class, 
+      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1); // one to get final "first" and "last" key
+    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));      
+    LOG.info("Started " + job.getJobName());
+    job.waitForCompletion(true);
+    LOG.info("Job status: " + job.getStatus());
+    assertTrue(job.getStatus().getState() == JobStatus.State.SUCCEEDED);
+    LOG.info("After map/reduce completion - job " + jobName);
+  }
+}



Mime
View raw message