hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1293098 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
Date Fri, 24 Feb 2012 06:38:19 GMT
Author: stack
Date: Fri Feb 24 06:38:18 2012
New Revision: 1293098

URL: http://svn.apache.org/viewvc?rev=1293098&view=rev
Log:
HBASE-5166 MultiThreaded Table Mapper analogous to MultiThreaded Mapper in hadoop

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java?rev=1293098&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
Fri Feb 24 06:38:18 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+/**
+ * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
+ * <p>
+ * It can be used instead when the Map operation is not CPU
+ * bound in order to improve throughput.
+ * <p>
+ * Mapper implementations using this MapRunnable must be thread-safe.
+ * <p>
+ * The Map-Reduce job has to be configured with the mapper to use via
+ * {@link #setMapperClass(Configuration, Class)} and
+ * the number of thread the thread-pool can use with the
+ * {@link #getNumberOfThreads(Configuration) method. The default
+ * value is 10 threads.
+ * <p>
+ */
+
+public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
+  private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
+  private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
+  private Context outer;
+  private ExecutorService executor;
+  public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
+  public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
+
+  /**
+   * The number of threads in the thread pool that will run the map function.
+   * @param job the job
+   * @return the number of threads
+   */
+  public static int getNumberOfThreads(JobContext job) {
+    return job.getConfiguration().
+        getInt(NUMBER_OF_THREADS, 10);
+  }
+
+  /**
+   * Set the number of threads in the pool for running maps.
+   * @param job the job to modify
+   * @param threads the new number of threads
+   */
+  public static void setNumberOfThreads(Job job, int threads) {
+    job.getConfiguration().setInt(NUMBER_OF_THREADS,
+        threads);
+  }
+
+  /**
+   * Get the application's mapper class.
+   * @param <K2> the map's output key type
+   * @param <V2> the map's output value type
+   * @param job the job
+   * @return the mapper class to run
+   */
+  @SuppressWarnings("unchecked")
+  public static <K2,V2>
+  Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext
job) {
+    return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
+        job.getConfiguration().getClass( MAPPER_CLASS,
+            Mapper.class);
+  }
+
+  /**
+   * Set the application's mapper class.
+   * @param <K2> the map output key type
+   * @param <V2> the map output value type
+   * @param job the job to modify
+   * @param cls the class to use as the mapper
+   */
+  public static <K2,V2>
+  void setMapperClass(Job job,
+      Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
+    if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
+      throw new IllegalArgumentException("Can't have recursive " +
+          "MultithreadedTableMapper instances.");
+    }
+    job.getConfiguration().setClass(MAPPER_CLASS,
+        cls, Mapper.class);
+  }
+
+  /**
+   * Run the application's maps using a thread pool.
+   */
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    outer = context;
+    int numberOfThreads = getNumberOfThreads(context);
+    mapClass = getMapperClass(context);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Configuring multithread runner to use " + numberOfThreads +
+          " threads");
+    }
+    executor = Executors.newFixedThreadPool(numberOfThreads);
+    for(int i=0; i < numberOfThreads; ++i) {
+      MapRunner thread = new MapRunner(context);
+      executor.execute(thread);
+    }
+    executor.shutdown();
+    while (!executor.isTerminated()) {
+      // wait till all the threads are done
+      Thread.sleep(1000);
+    }
+  }
+
+  private class SubMapRecordReader
+  extends RecordReader<ImmutableBytesWritable, Result> {
+    private ImmutableBytesWritable key;
+    private Result value;
+    private Configuration conf;
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0;
+    }
+
+    @Override
+    public void initialize(InputSplit split,
+        TaskAttemptContext context
+        ) throws IOException, InterruptedException {
+      conf = context.getConfiguration();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      synchronized (outer) {
+        if (!outer.nextKeyValue()) {
+          return false;
+        }
+        key = ReflectionUtils.copy(outer.getConfiguration(),
+            outer.getCurrentKey(), key);
+        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
+        return true;
+      }
+    }
+
+    public ImmutableBytesWritable getCurrentKey() {
+      return key;
+    }
+
+    @Override
+    public Result getCurrentValue() {
+      return value;
+    }
+  }
+
+  private class SubMapRecordWriter extends RecordWriter<K2,V2> {
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+    InterruptedException {
+    }
+
+    @Override
+    public void write(K2 key, V2 value) throws IOException,
+    InterruptedException {
+      synchronized (outer) {
+        outer.write(key, value);
+      }
+    }
+  }
+
+  private class SubMapStatusReporter extends StatusReporter {
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return outer.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return outer.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+      outer.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      outer.setStatus(status);
+    }
+  }
+
+  private class MapRunner implements Runnable {
+    private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
+    private Context subcontext;
+    private Throwable throwable;
+
+    MapRunner(Context context) throws IOException, InterruptedException {
+      mapper = ReflectionUtils.newInstance(mapClass,
+          context.getConfiguration());
+      subcontext = new Context(outer.getConfiguration(), 
+          outer.getTaskAttemptID(),
+          new SubMapRecordReader(),
+          new SubMapRecordWriter(),
+          context.getOutputCommitter(),
+          new SubMapStatusReporter(),
+          outer.getInputSplit());
+    }
+
+    @Override
+    public void run() {
+      try {
+        mapper.run(subcontext);
+      } catch (Throwable ie) {
+        throwable = ie;
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java?rev=1293098&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
Fri Feb 24 06:38:18 2012
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category(LargeTests.class)
+public class TestMulitthreadedTableMapper {
+  private static final Log LOG = LogFactory.getLog(TestMulitthreadedTableMapper.class);
+  private static final HBaseTestingUtility UTIL =
+      new HBaseTestingUtility();
+  static final String MULTI_REGION_TABLE_NAME = "mrtest";
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+  static final int    NUMBER_OF_THREADS = 10;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
+    desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
+    UTIL.startMiniCluster();
+    HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration());
+    admin.createTable(desc, HBaseTestingUtility.KEYS);
+    UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniMapReduceCluster();
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  public static class ProcessContentsMapper
+  extends TableMapper<ImmutableBytesWritable, Put> {
+
+    /**
+     * Pass the key, and reversed value to reduce
+     *
+     * @param key
+     * @param value
+     * @param context
+     * @throws IOException
+     */
+    public void map(ImmutableBytesWritable key, Result value,
+        Context context)
+            throws IOException, InterruptedException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+      cf = value.getMap();
+      if(!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+            Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+      // Get the original value and reverse it
+      String originalValue = new String(value.getValue(INPUT_FAMILY, null),
+          HConstants.UTF8_ENCODING);
+      StringBuilder newValue = new StringBuilder(originalValue);
+      newValue.reverse();
+      // Now set the value to be collected
+      Put outval = new Put(key.get());
+      outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+      context.write(key, outval);
+    }
+  }
+
+  /**
+   * Test multithreadedTableMappper map/reduce against a multi-region table
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testMultithreadedTableMapper()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
+        MULTI_REGION_TABLE_NAME));
+  }
+
+  private void runTestOnTable(HTable table)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = null;
+    try {
+      LOG.info("Before map/reduce startup");
+      job = new Job(table.getConfiguration(), "process column contents");
+      job.setNumReduceTasks(1);
+      Scan scan = new Scan();
+      scan.addFamily(INPUT_FAMILY);
+      TableMapReduceUtil.initTableMapperJob(
+          Bytes.toString(table.getTableName()), scan,
+          MultithreadedTableMapper.class, ImmutableBytesWritable.class,
+          Put.class, job);
+      MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
+      MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
+      TableMapReduceUtil.initTableReducerJob(
+          Bytes.toString(table.getTableName()),
+          IdentityTableReducer.class, job);
+      FileOutputFormat.setOutputPath(job, new Path("test"));
+      LOG.info("Started " + Bytes.toString(table.getTableName()));
+      job.waitForCompletion(true);
+      LOG.info("After map/reduce completion");
+      // verify map-reduce results
+      verify(Bytes.toString(table.getTableName()));
+    } finally {
+      table.close();
+      if (job != null) {
+        FileUtil.fullyDelete(
+            new File(job.getConfiguration().get("hadoop.tmp.dir")));
+      }
+    }
+  }
+
+  private void verify(String tableName) throws IOException {
+    HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
+    boolean verified = false;
+    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        LOG.info("Verification attempt #" + i);
+        verifyAttempt(table);
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty.  Presume its because updates came in
+        // after the scanner had been opened.  Wait a while and retry.
+        LOG.debug("Verification attempt failed: " + e.getMessage());
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    assertTrue(verified);
+    table.close();
+  }
+
+  /**
+   * Looks at every value of the mapreduce output and verifies that indeed
+   * the values have been reversed.
+   *
+   * @param table Table to scan.
+   * @throws IOException
+   * @throws NullPointerException if we failed to find a cell value
+   */
+  private void verifyAttempt(final HTable table)
+      throws IOException, NullPointerException {
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    scan.addFamily(OUTPUT_FAMILY);
+    ResultScanner scanner = table.getScanner(scan);
+    try {
+      for (Result r : scanner) {
+        if (LOG.isDebugEnabled()) {
+          if (r.size() > 2 ) {
+            throw new IOException("Too many results, expected 2 got " +
+                r.size());
+          }
+        }
+        byte[] firstValue = null;
+        byte[] secondValue = null;
+        int count = 0;
+        for(KeyValue kv : r.list()) {
+          if (count == 0) {
+            firstValue = kv.getValue();
+          }else if (count == 1) {
+            secondValue = kv.getValue();
+          }else if (count == 2) {
+            break;
+          }
+          count++;
+        }
+        String first = "";
+        if (firstValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+              ": first value is null");
+        }
+        first = new String(firstValue, HConstants.UTF8_ENCODING);
+        String second = "";
+        if (secondValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+              ": second value is null");
+        }
+        byte[] secondReversed = new byte[secondValue.length];
+        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+          secondReversed[i] = secondValue[j];
+        }
+        second = new String(secondReversed, HConstants.UTF8_ENCODING);
+        if (first.compareTo(second) != 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("second key is not the reverse of first. row=" +
+                Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+                ", second value=" + second);
+          }
+          fail();
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+  new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
+



Mime
View raw message