hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject svn commit: r1542341 - in /hbase/trunk: hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/test/java/org/ap...
Date Fri, 15 Nov 2013 17:43:53 GMT
Author: ndimiduk
Date: Fri Nov 15 17:43:53 2013
New Revision: 1542341

URL: http://svn.apache.org/r1542341
Log:
HBASE-9165 [mapreduce] Modularize building dependency jars

 - Separate adding HBase and dependencies from adding other job
   dependencies, and expose it as a separate method that other
   projects can use (for PIG-3285).
 - Explicitly add hbase-server to the list of dependencies we ship
   with the job, for users who extend the classes we provide (see
   HBASE-9112).
 - Add integration test for addDependencyJars.
 - Code reuse for TestTableMapReduce.

Added:
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java

Added: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java?rev=1542341&view=auto
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java
(added)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java
Fri Nov 15 17:43:53 2013
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.IntegrationTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test that we add tmpjars correctly including the named dependencies. Runs
+ * as an integration test so that classpath is realistic.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestTableMapReduceUtil implements Configurable, Tool {
+
+  private static IntegrationTestingUtility util;
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    if (null == util) {
+      util = new IntegrationTestingUtility();
+    }
+  }
+
+  @Before
+  public void skipMiniCluster() {
+    // test probably also works with a local cluster, but
+    // IntegrationTestingUtility doesn't support this concept.
+    assumeTrue("test requires a distributed cluster.", util.isDistributedCluster());
+  }
+
+  /**
+   * Look for jars we expect to be on the classpath by name.
+   */
+  @Test
+  public void testAddDependencyJars() throws Exception {
+    Job job = new Job();
+    TableMapReduceUtil.addDependencyJars(job);
+    String tmpjars = job.getConfiguration().get("tmpjars");
+
+    // verify presence of modules
+    assertTrue(tmpjars.contains("hbase-common"));
+    assertTrue(tmpjars.contains("hbase-protocol"));
+    assertTrue(tmpjars.contains("hbase-client"));
+    assertTrue(tmpjars.contains("hbase-hadoop-compat"));
+    assertTrue(tmpjars.contains("hbase-server"));
+
+    // verify presence of 3rd party dependencies.
+    assertTrue(tmpjars.contains("zookeeper"));
+    assertTrue(tmpjars.contains("netty"));
+    assertTrue(tmpjars.contains("protobuf"));
+    assertTrue(tmpjars.contains("guava"));
+    assertTrue(tmpjars.contains("htrace"));
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    provisionCluster();
+    skipMiniCluster();
+    testAddDependencyJars();
+    return 0;
+  }
+
+  public void setConf(Configuration conf) {
+    if (util != null) {
+      throw new IllegalArgumentException(
+          "setConf not supported after the test has been initialized.");
+    }
+    util = new IntegrationTestingUtility(conf);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    int status = ToolRunner.run(conf, new IntegrationTestTableMapReduceUtil(), args);
+    System.exit(status);
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=1542341&r1=1542340&r2=1542341&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
Fri Nov 15 17:43:53 2013
@@ -37,12 +37,11 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.security.token.Token;
 import org.apache.zookeeper.KeeperException;
 
@@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperExcept
 @Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-@SuppressWarnings("unchecked")
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class TableMapReduceUtil {
 
   /**
@@ -297,15 +296,14 @@ public class TableMapReduceUtil {
   }
 
   /**
-   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job)
+   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
    */
   public static void addDependencyJars(JobConf job) throws IOException {
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
       job,
-      org.apache.zookeeper.ZooKeeper.class,
-      org.jboss.netty.channel.ChannelFactory.class,
-      com.google.common.base.Function.class,
-      com.google.protobuf.Message.class,
+      // when making changes here, consider also mapreduce.TableMapReduceUtil
+      // pull job classes
       job.getMapOutputKeyClass(),
       job.getMapOutputValueClass(),
       job.getOutputKeyClass(),

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java?rev=1542341&r1=1542340&r2=1542341&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
Fri Nov 15 17:43:53 2013
@@ -45,7 +45,7 @@ extends TableMapper<ImmutableBytesWritab
    * @param job  The job configuration.
    * @throws IOException When setting up the job fails.
    */
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("rawtypes")
   public static void initJob(String table, Scan scan,
     Class<? extends TableMapper> mapper, Job job) throws IOException {
     TableMapReduceUtil.initTableMapperJob(table, scan, mapper,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1542341&r1=1542340&r2=1542341&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Fri Nov 15 17:43:53 2013
@@ -71,7 +71,7 @@ import com.google.protobuf.InvalidProtoc
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
  */
-@SuppressWarnings("unchecked")
+@SuppressWarnings({ "rawtypes", "unchecked" })
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class TableMapReduceUtil {
@@ -558,24 +558,44 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Add HBase and its dependencies (only) to the job configuration.
+   * <p>
+   * This is intended as a low-level API, facilitating code reuse between this
+   * class and its mapred counterpart. It also of use to extenral tools that
+   * need to build a MapReduce job that interacts with HBase but want
+   * fine-grained control over the jars shipped to the cluster.
+   * </p>
+   * @param conf The Configuration object to extend with dependencies.
+   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
+   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
+   */
+  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
+    addDependencyJars(conf,
+      // explicitly pull a class from each module
+      org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
+      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
+      org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
+      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
+      org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
+      // pull necessary dependencies
+      org.apache.zookeeper.ZooKeeper.class,
+      org.jboss.netty.channel.ChannelFactory.class,
+      com.google.protobuf.Message.class,
+      com.google.common.collect.Lists.class,
+      org.cloudera.htrace.Trace.class);
+  }
+
+
+  /**
    * Add the HBase dependency jars as well as jars for any of the configured
    * job classes to the job configuration, so that JobClient will ship them
    * to the cluster and add them to the DistributedCache.
    */
   public static void addDependencyJars(Job job) throws IOException {
+    addHBaseDependencyJars(job.getConfiguration());
     try {
       addDependencyJars(job.getConfiguration(),
-          // explicitly pull a class from each module
-          org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
-          org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
-          org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
-          // pull necessary dependencies
-          org.apache.zookeeper.ZooKeeper.class,
-          org.jboss.netty.channel.ChannelFactory.class,
-          com.google.protobuf.Message.class,
-          com.google.common.collect.Lists.class,
-          org.cloudera.htrace.Trace.class,
+          // when making changes here, consider also mapred.TableMapReduceUtil
           // pull job classes
           job.getMapOutputKeyClass(),
           job.getMapOutputValueClass(),

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=1542341&r1=1542340&r2=1542341&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
Fri Nov 15 17:43:53 2013
@@ -18,23 +18,20 @@
  */
 package org.apache.hadoop.hbase.mapred;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -42,101 +39,40 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
  * on our tables is simple - take every row in the table, reverse the value of
  * a particular cell, and write it back to the table.
  */
 @Category(LargeTests.class)
-public class TestTableMapReduce {
+@SuppressWarnings("deprecation")
+public class TestTableMapReduce extends TestTableMapReduceBase {
   private static final Log LOG =
     LogFactory.getLog(TestTableMapReduce.class.getName());
-  private static final HBaseTestingUtility UTIL =
-    new HBaseTestingUtility();
-  static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
-  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
-  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
-
-  private static final byte [][] columns = new byte [][] {
-    INPUT_FAMILY,
-    OUTPUT_FAMILY
-  };
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    UTIL.startMiniCluster();
-    HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY,
OUTPUT_FAMILY});
-    UTIL.createMultiRegions(table, INPUT_FAMILY);
-    UTIL.loadTable(table, INPUT_FAMILY);
-    UTIL.startMiniMapReduceCluster();
-  }
 
-  @AfterClass
-  public static void afterClass() throws Exception {
-    UTIL.shutdownMiniMapReduceCluster();
-    UTIL.shutdownMiniCluster();
-  }
+  protected Log getLog() { return LOG; }
 
   /**
    * Pass the given key and processed record reduce
    */
-  public static class ProcessContentsMapper
-  extends MapReduceBase
-  implements TableMap<ImmutableBytesWritable, Put> {
+  static class ProcessContentsMapper extends MapReduceBase implements
+      TableMap<ImmutableBytesWritable, Put> {
+
     /**
      * Pass the key, and reversed value to reduce
-     * @param key
-     * @param value
-     * @param output
-     * @param reporter
-     * @throws IOException
      */
     public void map(ImmutableBytesWritable key, Result value,
       OutputCollector<ImmutableBytesWritable, Put> output,
       Reporter reporter)
     throws IOException {
-      if (value.size() != 1) {
-        throw new IOException("There should only be one input column");
-      }
-      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
-        cf = value.getMap();
-      if(!cf.containsKey(INPUT_FAMILY)) {
-        throw new IOException("Wrong input columns. Missing: '" +
-          Bytes.toString(INPUT_FAMILY) + "'.");
-      }
-
-      // Get the original value and reverse it
-
-      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
-      StringBuilder newValue = new StringBuilder(originalValue);
-      newValue.reverse();
-
-      // Now set the value to be collected
-
-      Put outval = new Put(key.get());
-      outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
-      output.collect(key, outval);
+      output.collect(key, TestTableMapReduceBase.map(key, value));
     }
   }
 
-  /**
-   * Test a map/reduce against a multi-region table
-   * @throws IOException
-   */
-  @Test
-  public void testMultiRegionTable() throws IOException {
-    runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
-  }
-
-  private void runTestOnTable(HTable table) throws IOException {
+  @Override
+  protected void runTestOnTable(HTable table) throws IOException {
     JobConf jobConf = null;
     try {
       LOG.info("Before map/reduce startup");
@@ -162,102 +98,5 @@ public class TestTableMapReduce {
       }
     }
   }
-
-  private void verify(String tableName) throws IOException {
-    HTable table = new HTable(UTIL.getConfiguration(), tableName);
-    boolean verified = false;
-    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
-    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
5);
-    for (int i = 0; i < numRetries; i++) {
-      try {
-        LOG.info("Verification attempt #" + i);
-        verifyAttempt(table);
-        verified = true;
-        break;
-      } catch (NullPointerException e) {
-        // If here, a cell was empty.  Presume its because updates came in
-        // after the scanner had been opened.  Wait a while and retry.
-        LOG.debug("Verification attempt failed: " + e.getMessage());
-      }
-      try {
-        Thread.sleep(pause);
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-    assertTrue(verified);
-  }
-
-  /**
-   * Looks at every value of the mapreduce output and verifies that indeed
-   * the values have been reversed.
-   * @param table Table to scan.
-   * @throws IOException
-   * @throws NullPointerException if we failed to find a cell value
-   */
-  private void verifyAttempt(final HTable table) throws IOException, NullPointerException
{
-    Scan scan = new Scan();
-    TableInputFormat.addColumns(scan, columns);
-    ResultScanner scanner = table.getScanner(scan);
-    try {
-      Iterator<Result> itr = scanner.iterator();
-      assertTrue(itr.hasNext());
-      while(itr.hasNext()) {
-        Result r = itr.next();
-        if (LOG.isDebugEnabled()) {
-          if (r.size() > 2 ) {
-            throw new IOException("Too many results, expected 2 got " +
-              r.size());
-          }
-        }
-        byte[] firstValue = null;
-        byte[] secondValue = null;
-        int count = 0;
-         for(Cell kv : r.listCells()) {
-          if (count == 0) {
-            firstValue = CellUtil.cloneValue(kv);
-          }
-          if (count == 1) {
-            secondValue = CellUtil.cloneValue(kv);;
-          }
-          count++;
-          if (count == 2) {
-            break;
-          }
-        }
-
-
-        String first = "";
-        if (firstValue == null) {
-          throw new NullPointerException(Bytes.toString(r.getRow()) +
-            ": first value is null");
-        }
-        first = Bytes.toString(firstValue);
-
-        String second = "";
-        if (secondValue == null) {
-          throw new NullPointerException(Bytes.toString(r.getRow()) +
-            ": second value is null");
-        }
-        byte[] secondReversed = new byte[secondValue.length];
-        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
-          secondReversed[i] = secondValue[j];
-        }
-        second = Bytes.toString(secondReversed);
-
-        if (first.compareTo(second) != 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("second key is not the reverse of first. row=" +
-                r.getRow() + ", first value=" + first + ", second value=" +
-                second);
-          }
-          fail();
-        }
-      }
-    } finally {
-      scanner.close();
-    }
-  }
-
 }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java?rev=1542341&r1=1542340&r2=1542341&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
Fri Nov 15 17:43:53 2013
@@ -58,34 +58,15 @@ import org.junit.experimental.categories
  * a particular cell, and write it back to the table.
  */
 @Category(LargeTests.class)
-public class TestTableMapReduce {
+public class TestTableMapReduce extends TestTableMapReduceBase {
   private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
-  private static final HBaseTestingUtility UTIL =
-    new HBaseTestingUtility();
-  static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
-  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
-  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    UTIL.startMiniCluster();
-    HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY,
OUTPUT_FAMILY});
-    UTIL.createMultiRegions(table, INPUT_FAMILY);
-    UTIL.loadTable(table, INPUT_FAMILY);
-    UTIL.startMiniMapReduceCluster();
-  }
 
-  @AfterClass
-  public static void afterClass() throws Exception {
-    UTIL.shutdownMiniMapReduceCluster();
-    UTIL.shutdownMiniCluster();
-  }
+  protected Log getLog() { return LOG; }
 
   /**
    * Pass the given key and processed record reduce
    */
-  public static class ProcessContentsMapper
-  extends TableMapper<ImmutableBytesWritable, Put> {
+  static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put>
{
 
     /**
      * Pass the key, and reversed value to reduce
@@ -119,30 +100,7 @@ public class TestTableMapReduce {
     }
   }
 
-  /**
-   * Test a map/reduce against a multi-region table
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testMultiRegionTable()
-  throws IOException, InterruptedException, ClassNotFoundException {
-    runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
-      MULTI_REGION_TABLE_NAME));
-  }
-
-  @Test
-  public void testCombiner()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration conf = new Configuration(UTIL.getConfiguration());
-    // force use of combiner for testing purposes
-    conf.setInt("min.num.spills.for.combine", 1);
-    runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
-  }
-
-  private void runTestOnTable(HTable table)
-  throws IOException, InterruptedException, ClassNotFoundException {
+  protected void runTestOnTable(HTable table) throws IOException {
     Job job = null;
     try {
       LOG.info("Before map/reduce startup");
@@ -164,6 +122,10 @@ public class TestTableMapReduce {
 
       // verify map-reduce results
       verify(Bytes.toString(table.getTableName()));
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
     } finally {
       table.close();
       if (job != null) {
@@ -172,123 +134,4 @@ public class TestTableMapReduce {
       }
     }
   }
-
-  private void verify(String tableName) throws IOException {
-    HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
-    boolean verified = false;
-    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
-    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
5);
-    for (int i = 0; i < numRetries; i++) {
-      try {
-        LOG.info("Verification attempt #" + i);
-        verifyAttempt(table);
-        verified = true;
-        break;
-      } catch (NullPointerException e) {
-        // If here, a cell was empty.  Presume its because updates came in
-        // after the scanner had been opened.  Wait a while and retry.
-        LOG.debug("Verification attempt failed: " + e.getMessage());
-      }
-      try {
-        Thread.sleep(pause);
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-    assertTrue(verified);
-    table.close();
-  }
-
-  /**
-   * Looks at every value of the mapreduce output and verifies that indeed
-   * the values have been reversed.
-   *
-   * @param table Table to scan.
-   * @throws IOException
-   * @throws NullPointerException if we failed to find a cell value
-   */
-  private void verifyAttempt(final HTable table) throws IOException, NullPointerException
{
-    Scan scan = new Scan();
-    scan.addFamily(INPUT_FAMILY);
-    scan.addFamily(OUTPUT_FAMILY);
-    ResultScanner scanner = table.getScanner(scan);
-    try {
-      Iterator<Result> itr = scanner.iterator();
-      assertTrue(itr.hasNext());
-      while(itr.hasNext()) {
-        Result r = itr.next();
-        if (LOG.isDebugEnabled()) {
-          if (r.size() > 2 ) {
-            throw new IOException("Too many results, expected 2 got " +
-              r.size());
-          }
-        }
-        byte[] firstValue = null;
-        byte[] secondValue = null;
-        int count = 0;
-        for(Cell kv : r.listCells()) {
-          if (count == 0) {
-            firstValue = CellUtil.cloneValue(kv);
-          }
-          if (count == 1) {
-            secondValue = CellUtil.cloneValue(kv);
-          }
-          count++;
-          if (count == 2) {
-            break;
-          }
-        }
-
-        String first = "";
-        if (firstValue == null) {
-          throw new NullPointerException(Bytes.toString(r.getRow()) +
-            ": first value is null");
-        }
-        first = Bytes.toString(firstValue);
-
-        String second = "";
-        if (secondValue == null) {
-          throw new NullPointerException(Bytes.toString(r.getRow()) +
-            ": second value is null");
-        }
-        byte[] secondReversed = new byte[secondValue.length];
-        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
-          secondReversed[i] = secondValue[j];
-        }
-        second = Bytes.toString(secondReversed);
-
-        if (first.compareTo(second) != 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("second key is not the reverse of first. row=" +
-                Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
-                ", second value=" + second);
-          }
-          fail();
-        }
-      }
-    } finally {
-      scanner.close();
-    }
-  }
-
-  /**
-   * Test that we add tmpjars correctly including the ZK jar.
-   */
-  public void testAddDependencyJars() throws Exception {
-    Job job = new Job();
-    TableMapReduceUtil.addDependencyJars(job);
-    String tmpjars = job.getConfiguration().get("tmpjars");
-
-    System.err.println("tmpjars: " + tmpjars);
-    assertTrue(tmpjars.contains("zookeeper"));
-    assertFalse(tmpjars.contains("guava"));
-
-    System.err.println("appending guava jar");
-    TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 
-        com.google.common.base.Function.class);
-    tmpjars = job.getConfiguration().get("tmpjars");
-    assertTrue(tmpjars.contains("guava"));
-  }
-
 }
-

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java?rev=1542341&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
Fri Nov 15 17:43:53 2013
@@ -0,0 +1,227 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're
testing
+ * on our tables is simple - take every row in the table, reverse the value of a particular
cell,
+ * and write it back to the table. Implements common components between mapred and mapreduce
+ * implementations.
+ */
+public abstract class TestTableMapReduceBase {
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  protected static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
+  protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+
+  protected static final byte[][] columns = new byte[][] {
+    INPUT_FAMILY,
+    OUTPUT_FAMILY
+  };
+
+  /**
+   * Retrieve my logger instance.
+   */
+  protected abstract Log getLog();
+
+  /**
+   * Handles API-specifics for setting up and executing the job.
+   */
+  protected abstract void runTestOnTable(HTable table) throws IOException;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+    HTable table =
+        UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY
});
+    UTIL.createMultiRegions(table, INPUT_FAMILY);
+    UTIL.loadTable(table, INPUT_FAMILY);
+    UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniMapReduceCluster();
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Test a map/reduce against a multi-region table
+   * @throws IOException
+   */
+  @Test
+  public void testMultiRegionTable() throws IOException {
+    runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
+  }
+
+  @Test
+  public void testCombiner() throws IOException {
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    // force use of combiner for testing purposes
+    conf.setInt("min.num.spills.for.combine", 1);
+    runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
+  }
+
+  /**
+   * Implements mapper logic for use across APIs.
+   */
+  protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
+    if (value.size() != 1) {
+      throw new IOException("There should only be one input column");
+    }
+    Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+      cf = value.getMap();
+    if(!cf.containsKey(INPUT_FAMILY)) {
+      throw new IOException("Wrong input columns. Missing: '" +
+        Bytes.toString(INPUT_FAMILY) + "'.");
+    }
+
+    // Get the original value and reverse it
+
+    String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+    StringBuilder newValue = new StringBuilder(originalValue);
+    newValue.reverse();
+
+    // Now set the value to be collected
+
+    Put outval = new Put(key.get());
+    outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+    return outval;
+  }
+
+  protected void verify(String tableName) throws IOException {
+    HTable table = new HTable(UTIL.getConfiguration(), tableName);
+    boolean verified = false;
+    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        getLog().info("Verification attempt #" + i);
+        verifyAttempt(table);
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+        getLog().debug("Verification attempt failed: " + e.getMessage());
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    assertTrue(verified);
+  }
+
+  /**
+   * Looks at every value of the mapreduce output and verifies that indeed
+   * the values have been reversed.
+   * @param table Table to scan.
+   * @throws IOException
+   * @throws NullPointerException if we failed to find a cell value
+   */
+  private void verifyAttempt(final HTable table) throws IOException, NullPointerException
{
+    Scan scan = new Scan();
+    TableInputFormat.addColumns(scan, columns);
+    ResultScanner scanner = table.getScanner(scan);
+    try {
+      Iterator<Result> itr = scanner.iterator();
+      assertTrue(itr.hasNext());
+      while(itr.hasNext()) {
+        Result r = itr.next();
+        if (getLog().isDebugEnabled()) {
+          if (r.size() > 2 ) {
+            throw new IOException("Too many results, expected 2 got " +
+              r.size());
+          }
+        }
+        byte[] firstValue = null;
+        byte[] secondValue = null;
+        int count = 0;
+         for(Cell kv : r.listCells()) {
+          if (count == 0) {
+            firstValue = CellUtil.cloneValue(kv);
+          }
+          if (count == 1) {
+            secondValue = CellUtil.cloneValue(kv);
+          }
+          count++;
+          if (count == 2) {
+            break;
+          }
+        }
+
+
+        if (firstValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+            ": first value is null");
+        }
+        String first = Bytes.toString(firstValue);
+
+        if (secondValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+            ": second value is null");
+        }
+        byte[] secondReversed = new byte[secondValue.length];
+        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+          secondReversed[i] = secondValue[j];
+        }
+        String second = Bytes.toString(secondReversed);
+
+        if (first.compareTo(second) != 0) {
+          if (getLog().isDebugEnabled()) {
+            getLog().debug("second key is not the reverse of first. row=" +
+                Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+                ", second value=" + second);
+          }
+          fail();
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java?rev=1542341&r1=1542340&r2=1542341&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
Fri Nov 15 17:43:53 2013
@@ -15,8 +15,11 @@
 
 package org.apache.hadoop.hbase.mapreduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
@@ -24,19 +27,15 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import static org.junit.Assert.*;
 
 /**
- * Test class TableMapReduceUtil
+ * Test different variants of initTableMapperJob method
  */
-
-@Category(LargeTests.class)
+@Category(SmallTests.class)
 public class TestTableMapReduceUtil {
-  /**
-   * Test different variants ofinitTableMapperJob method
-   */
-  @Test (timeout=600000)
-  public void testInitTableMapperJob() throws Exception {
+
+  @Test
+  public void testInitTableMapperJob1() throws Exception {
     Configuration configuration = new Configuration();
     Job job = new Job(configuration, "tableName");
     // test 
@@ -48,9 +47,12 @@ public class TestTableMapReduceUtil {
     assertEquals(Text.class, job.getOutputValueClass());
     assertNull(job.getCombinerClass());
     assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
 
-    configuration = new Configuration();
-    job = new Job(configuration, "tableName");
+  @Test
+  public void testInitTableMapperJob2() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
     TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
         Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class);
     assertEquals(HLogInputFormat.class, job.getInputFormatClass());
@@ -59,9 +61,12 @@ public class TestTableMapReduceUtil {
     assertEquals(Text.class, job.getOutputValueClass());
     assertNull(job.getCombinerClass());
     assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
 
-    configuration = new Configuration();
-    job = new Job(configuration, "tableName");
+  @Test
+  public void testInitTableMapperJob3() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
     TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
         Import.Importer.class, Text.class, Text.class, job);
     assertEquals(TableInputFormat.class, job.getInputFormatClass());
@@ -70,9 +75,12 @@ public class TestTableMapReduceUtil {
     assertEquals(Text.class, job.getOutputValueClass());
     assertNull(job.getCombinerClass());
     assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+  }
 
-    configuration = new Configuration();
-    job = new Job(configuration, "tableName");
+  @Test
+  public void testInitTableMapperJob4() throws Exception {
+    Configuration configuration = new Configuration();
+    Job job = new Job(configuration, "tableName");
     TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
         Import.Importer.class, Text.class, Text.class, job, false);
     assertEquals(TableInputFormat.class, job.getInputFormatClass());
@@ -82,4 +90,4 @@ public class TestTableMapReduceUtil {
     assertNull(job.getCombinerClass());
     assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
   }
-}
\ No newline at end of file
+}



Mime
View raw message