hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [42/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.
Date Sun, 27 Aug 2017 05:33:42 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..4331c0f
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.ConfigurationUtil;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Shared implementation of mapreduce code over multiple table snapshots.
+ * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
+ * .MultiTableSnapshotInputFormat} and mapred
+ * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
+ */
+@InterfaceAudience.LimitedPrivate({ "HBase" })
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormatImpl {
+
+  private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
+
+  public static final String RESTORE_DIRS_KEY =
+      "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
+  public static final String SNAPSHOT_TO_SCANS_KEY =
+      "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
+
+  /**
+   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
+   * restoreDir.
+   * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
+   *
+   * @param conf
+   * @param snapshotScans
+   * @param restoreDir
+   * @throws IOException
+   */
+  public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
+      Path restoreDir) throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    setSnapshotToScans(conf, snapshotScans);
+    Map<String, Path> restoreDirs =
+        generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
+    setSnapshotDirs(conf, restoreDirs);
+    restoreSnapshots(conf, restoreDirs, fs);
+  }
+
+  /**
+   * Return the list of splits extracted from the scans/snapshots pushed to conf by
+   * {@link
+   * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
+   *
+   * @param conf Configuration to determine splits from
+   * @return Return the list of splits extracted from the scans/snapshots pushed to conf
+   * @throws IOException
+   */
+  public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
+      throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
+
+    Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
+    Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
+    for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
+      String snapshotName = entry.getKey();
+
+      Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
+
+      SnapshotManifest manifest =
+          TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
+      List<HRegionInfo> regionInfos =
+          TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
+
+      for (Scan scan : entry.getValue()) {
+        List<TableSnapshotInputFormatImpl.InputSplit> splits =
+            TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
+        rtn.addAll(splits);
+      }
+    }
+    return rtn;
+  }
+
+  /**
+   * Retrieve the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration by
+   * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
+   *
+   * @param conf Configuration to extract name -&gt; list&lt;scan&gt; mappings from.
+   * @return the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration
+   * @throws IOException
+   */
+  public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
+
+    Map<String, Collection<Scan>> rtn = Maps.newHashMap();
+
+    for (Map.Entry<String, String> entry : ConfigurationUtil
+        .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
+      String snapshotName = entry.getKey();
+      String scan = entry.getValue();
+
+      Collection<Scan> snapshotScans = rtn.get(snapshotName);
+      if (snapshotScans == null) {
+        snapshotScans = Lists.newArrayList();
+        rtn.put(snapshotName, snapshotScans);
+      }
+
+      snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
+    }
+
+    return rtn;
+  }
+
+  /**
+   * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
+   *
+   * @param conf
+   * @param snapshotScans
+   * @throws IOException
+   */
+  public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
+      throws IOException {
+    // flatten out snapshotScans for serialization to the job conf
+    List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
+
+    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
+      String snapshotName = entry.getKey();
+      Collection<Scan> scans = entry.getValue();
+
+      // serialize all scans and map them to the appropriate snapshot
+      for (Scan scan : scans) {
+        snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
+            TableMapReduceUtil.convertScanToString(scan)));
+      }
+    }
+
+    ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
+  }
+
+  /**
+   * Retrieve the directories into which snapshots have been restored from
+   * ({@link #RESTORE_DIRS_KEY})
+   *
+   * @param conf Configuration to extract restore directories from
+   * @return the directories into which snapshots have been restored from
+   * @throws IOException
+   */
+  public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
+    List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
+    Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
+
+    for (Map.Entry<String, String> kvp : kvps) {
+      rtn.put(kvp.getKey(), new Path(kvp.getValue()));
+    }
+
+    return rtn;
+  }
+
+  public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
+    Map<String, String> toSet = Maps.newHashMap();
+
+    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
+      toSet.put(entry.getKey(), entry.getValue().toString());
+    }
+
+    ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
+  }
+
+  /**
+   * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
+   * return a map from the snapshot to the restore directory.
+   *
+   * @param snapshots      collection of snapshot names to restore
+   * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
+   * @return a mapping from snapshot name to the directory in which that snapshot has been restored
+   */
+  private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
+      Path baseRestoreDir) {
+    Map<String, Path> rtn = Maps.newHashMap();
+
+    for (String snapshotName : snapshots) {
+      Path restoreSnapshotDir =
+          new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
+      rtn.put(snapshotName, restoreSnapshotDir);
+    }
+
+    return rtn;
+  }
+
+  /**
+   * Restore each (snapshot name, restore directory) pair in snapshotToDir
+   *
+   * @param conf          configuration to restore with
+   * @param snapshotToDir mapping from snapshot names to restore directories
+   * @param fs            filesystem to do snapshot restoration on
+   * @throws IOException
+   */
+  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
+      throws IOException {
+    // TODO: restore from record readers to parallelize.
+    Path rootDir = FSUtils.getRootDir(conf);
+
+    for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
+      String snapshotName = entry.getKey();
+      Path restoreDir = entry.getValue();
+      LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
+          + " for MultiTableSnapshotInputFormat");
+      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
+    }
+  }
+
+  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
+      FileSystem fs) throws IOException {
+    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
new file mode 100644
index 0000000..a505379
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+/**
+ * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
+ * <p>
+ * It can be used instead when the Map operation is not CPU
+ * bound in order to improve throughput.
+ * <p>
+ * Mapper implementations using this MapRunnable must be thread-safe.
+ * <p>
+ * The Map-Reduce job has to be configured with the mapper to use via
+ * {@link #setMapperClass} and the number of thread the thread-pool can use with the
+ * {@link #getNumberOfThreads} method. The default value is 10 threads.
+ * <p>
+ */
+
+public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
+  private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
+  private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
+  private Context outer;
+  private ExecutorService executor;
+  public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
+  public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
+
+  /**
+   * The number of threads in the thread pool that will run the map function.
+   * @param job the job
+   * @return the number of threads
+   */
+  public static int getNumberOfThreads(JobContext job) {
+    return job.getConfiguration().
+        getInt(NUMBER_OF_THREADS, 10);
+  }
+
+  /**
+   * Set the number of threads in the pool for running maps.
+   * @param job the job to modify
+   * @param threads the new number of threads
+   */
+  public static void setNumberOfThreads(Job job, int threads) {
+    job.getConfiguration().setInt(NUMBER_OF_THREADS,
+        threads);
+  }
+
+  /**
+   * Get the application's mapper class.
+   * @param <K2> the map's output key type
+   * @param <V2> the map's output value type
+   * @param job the job
+   * @return the mapper class to run
+   */
+  @SuppressWarnings("unchecked")
+  public static <K2,V2>
+  Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
+    return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
+        job.getConfiguration().getClass( MAPPER_CLASS,
+            Mapper.class);
+  }
+
+  /**
+   * Set the application's mapper class.
+   * @param <K2> the map output key type
+   * @param <V2> the map output value type
+   * @param job the job to modify
+   * @param cls the class to use as the mapper
+   */
+  public static <K2,V2>
+  void setMapperClass(Job job,
+      Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
+    if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
+      throw new IllegalArgumentException("Can't have recursive " +
+          "MultithreadedTableMapper instances.");
+    }
+    job.getConfiguration().setClass(MAPPER_CLASS,
+        cls, Mapper.class);
+  }
+
+  /**
+   * Run the application's maps using a thread pool.
+   */
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    outer = context;
+    int numberOfThreads = getNumberOfThreads(context);
+    mapClass = getMapperClass(context);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Configuring multithread runner to use " + numberOfThreads +
+          " threads");
+    }
+    executor = Executors.newFixedThreadPool(numberOfThreads);
+    for(int i=0; i < numberOfThreads; ++i) {
+      MapRunner thread = new MapRunner(context);
+      executor.execute(thread);
+    }
+    executor.shutdown();
+    while (!executor.isTerminated()) {
+      // wait till all the threads are done
+      Thread.sleep(1000);
+    }
+  }
+
+  private class SubMapRecordReader
+  extends RecordReader<ImmutableBytesWritable, Result> {
+    private ImmutableBytesWritable key;
+    private Result value;
+    private Configuration conf;
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0;
+    }
+
+    @Override
+    public void initialize(InputSplit split,
+        TaskAttemptContext context
+        ) throws IOException, InterruptedException {
+      conf = context.getConfiguration();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      synchronized (outer) {
+        if (!outer.nextKeyValue()) {
+          return false;
+        }
+        key = ReflectionUtils.copy(outer.getConfiguration(),
+            outer.getCurrentKey(), key);
+        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
+        return true;
+      }
+    }
+
+    public ImmutableBytesWritable getCurrentKey() {
+      return key;
+    }
+
+    @Override
+    public Result getCurrentValue() {
+      return value;
+    }
+  }
+
+  private class SubMapRecordWriter extends RecordWriter<K2,V2> {
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+    InterruptedException {
+    }
+
+    @Override
+    public void write(K2 key, V2 value) throws IOException,
+    InterruptedException {
+      synchronized (outer) {
+        outer.write(key, value);
+      }
+    }
+  }
+
+  private class SubMapStatusReporter extends StatusReporter {
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return outer.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return outer.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+      outer.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      outer.setStatus(status);
+    }
+
+    public float getProgress() {
+      return 0;
+    }
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
+      justification="Don't understand why FB is complaining about this one. We do throw exception")
+  private class MapRunner implements Runnable {
+    private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
+    private Context subcontext;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    MapRunner(Context context) throws IOException, InterruptedException {
+      mapper = ReflectionUtils.newInstance(mapClass,
+          context.getConfiguration());
+      try {
+        Constructor c = context.getClass().getConstructor(
+          Mapper.class,
+          Configuration.class,
+          TaskAttemptID.class,
+          RecordReader.class,
+          RecordWriter.class,
+          OutputCommitter.class,
+          StatusReporter.class,
+          InputSplit.class);
+        c.setAccessible(true);
+        subcontext = (Context) c.newInstance(
+          mapper,
+          outer.getConfiguration(),
+          outer.getTaskAttemptID(),
+          new SubMapRecordReader(),
+          new SubMapRecordWriter(),
+          context.getOutputCommitter(),
+          new SubMapStatusReporter(),
+          outer.getInputSplit());
+      } catch (Exception e) {
+        try {
+          Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
+            Configuration.class,
+            TaskAttemptID.class,
+            RecordReader.class,
+            RecordWriter.class,
+            OutputCommitter.class,
+            StatusReporter.class,
+            InputSplit.class);
+          c.setAccessible(true);
+          MapContext mc = (MapContext) c.newInstance(
+            outer.getConfiguration(),
+            outer.getTaskAttemptID(),
+            new SubMapRecordReader(),
+            new SubMapRecordWriter(),
+            context.getOutputCommitter(),
+            new SubMapStatusReporter(),
+            outer.getInputSplit());
+          Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
+          Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
+          subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
+        } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
+          // rethrow as IOE
+          throw new IOException(e);
+        }
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        mapper.run(subcontext);
+      } catch (Throwable ie) {
+        LOG.error("Problem in running map.", ie);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
new file mode 100644
index 0000000..d5faab5
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+@InterfaceAudience.Public
+public class MutationSerialization implements Serialization<Mutation> {
+  @Override
+  public boolean accept(Class<?> c) {
+    return Mutation.class.isAssignableFrom(c);
+  }
+
+  @Override
+  public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
+    return new MutationDeserializer();
+  }
+
+  @Override
+  public Serializer<Mutation> getSerializer(Class<Mutation> c) {
+    return new MutationSerializer();
+  }
+
+  private static class MutationDeserializer implements Deserializer<Mutation> {
+    private InputStream in;
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public Mutation deserialize(Mutation mutation) throws IOException {
+      MutationProto proto = MutationProto.parseDelimitedFrom(in);
+      return ProtobufUtil.toMutation(proto);
+    }
+
+    @Override
+    public void open(InputStream in) throws IOException {
+      this.in = in;
+    }
+
+  }
+  private static class MutationSerializer implements Serializer<Mutation> {
+    private OutputStream out;
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+
+    @Override
+    public void open(OutputStream out) throws IOException {
+      this.out = out;
+    }
+
+    @Override
+    public void serialize(Mutation mutation) throws IOException {
+      MutationType type;
+      if (mutation instanceof Put) {
+        type = MutationType.PUT;
+      } else if (mutation instanceof Delete) {
+        type = MutationType.DELETE;
+      } else {
+        throw new IllegalArgumentException("Only Put and Delete are supported");
+      }
+      ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
new file mode 100644
index 0000000..f01e84f
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Combine Puts. Merges Put instances grouped by <code>K</code> into a single
+ * instance.
+ * @see TableMapReduceUtil
+ */
+@InterfaceAudience.Public
+public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
+  private static final Log LOG = LogFactory.getLog(PutCombiner.class);
+
+  @Override
+  protected void reduce(K row, Iterable<Put> vals, Context context)
+      throws IOException, InterruptedException {
+    // Using HeapSize to create an upper bound on the memory size of
+    // the puts and flush some portion of the content while looping. This
+    // flush could result in multiple Puts for a single rowkey. That is
+    // acceptable because Combiner is run as an optimization and it's not
+    // critical that all Puts are grouped perfectly.
+    long threshold = context.getConfiguration().getLong(
+        "putcombiner.row.threshold", 1L * (1<<30));
+    int cnt = 0;
+    long curSize = 0;
+    Put put = null;
+    Map<byte[], List<Cell>> familyMap = null;
+    for (Put p : vals) {
+      cnt++;
+      if (put == null) {
+        put = p;
+        familyMap = put.getFamilyCellMap();
+      } else {
+        for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
+            .entrySet()) {
+          List<Cell> cells = familyMap.get(entry.getKey());
+          List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
+          for (Cell cell : entry.getValue()) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            curSize += kv.heapSize();
+            if (kvs != null) {
+              kvs.add(kv);
+            }
+          }
+          if (cells == null) {
+            familyMap.put(entry.getKey(), entry.getValue());
+          }
+        }
+        if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
+        if (curSize > threshold) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
+          }
+          context.write(row, put);
+          put = null;
+          curSize = 0;
+          cnt = 0;
+        }
+      }
+    }
+    if (put != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
+      }
+      context.write(row, put);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
new file mode 100644
index 0000000..17ab9cb
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.TagUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Emits sorted Puts.
+ * Reads in all Puts from passed Iterator, sorts them, then emits
+ * Puts in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat2
+ * @see KeyValueSortReducer
+ */
+@InterfaceAudience.Public
+public class PutSortReducer extends
+    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
+  // the cell creator
+  private CellCreator kvCreator;
+
+  @Override
+  protected void
+      setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
+          throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    this.kvCreator = new CellCreator(conf);
+  }
+
+  @Override
+  protected void reduce(
+      ImmutableBytesWritable row,
+      java.lang.Iterable<Put> puts,
+      Reducer<ImmutableBytesWritable, Put,
+              ImmutableBytesWritable, KeyValue>.Context context)
+      throws java.io.IOException, InterruptedException
+  {
+    // although reduce() is called per-row, handle pathological case
+    long threshold = context.getConfiguration().getLong(
+        "putsortreducer.row.threshold", 1L * (1<<30));
+    Iterator<Put> iter = puts.iterator();
+    while (iter.hasNext()) {
+      TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
+      long curSize = 0;
+      // stop at the end or the RAM threshold
+      List<Tag> tags = new ArrayList<>();
+      while (iter.hasNext() && curSize < threshold) {
+        // clear the tags
+        tags.clear();
+        Put p = iter.next();
+        long t = p.getTTL();
+        if (t != Long.MAX_VALUE) {
+          // add TTL tag if found
+          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
+        }
+        byte[] acl = p.getACL();
+        if (acl != null) {
+          // add ACL tag if found
+          tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
+        }
+        try {
+          CellVisibility cellVisibility = p.getCellVisibility();
+          if (cellVisibility != null) {
+            // add the visibility labels if any
+            tags.addAll(kvCreator.getVisibilityExpressionResolver()
+                .createVisibilityExpTags(cellVisibility.getExpression()));
+          }
+        } catch (DeserializationException e) {
+          // We just throw exception here. Should we allow other mutations to proceed by
+          // just ignoring the bad one?
+          throw new IOException("Invalid visibility expression found in mutation " + p, e);
+        }
+        for (List<Cell> cells: p.getFamilyCellMap().values()) {
+          for (Cell cell: cells) {
+            // Creating the KV which needs to be directly written to HFiles. Using the Facade
+            // KVCreator for creation of kvs.
+            KeyValue kv = null;
+            TagUtil.carryForwardTags(tags, cell);
+            if (!tags.isEmpty()) {
+              kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
+                cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
+                cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
+                cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
+                cell.getValueOffset(), cell.getValueLength(), tags);
+            } else {
+              kv = KeyValueUtil.ensureKeyValue(cell);
+            }
+            if (map.add(kv)) {// don't count duplicated kv into size
+              curSize += kv.heapSize();
+            }
+          }
+        }
+      }
+      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+          + "(" + StringUtils.humanReadableInt(curSize) + ")");
+      int index = 0;
+      for (KeyValue kv : map) {
+        context.write(row, kv);
+        if (++index % 100 == 0)
+          context.setStatus("Wrote " + index);
+      }
+
+      // if we have more entries to process
+      if (iter.hasNext()) {
+        // force flush because we cannot guarantee intra-row sorted order
+        context.write(null, null);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java
new file mode 100644
index 0000000..f14cd90
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Computes size of each region for given table and given column families.
+ * The value is used by MapReduce for better scheduling.
+ * */
+@InterfaceAudience.Private
+public class RegionSizeCalculator {
+
+  private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class);
+
+  /**
+   * Maps each region to its size in bytes.
+   * */
+  private final Map<byte[], Long> sizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+  private static final long MEGABYTE = 1024L * 1024L;
+
+  /**
+   * Computes size of each region for table and given column families.
+   * */
+  public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException {
+    init(regionLocator, admin);
+  }
+
+  private void init(RegionLocator regionLocator, Admin admin)
+      throws IOException {
+    if (!enabled(admin.getConfiguration())) {
+      LOG.info("Region size calculation disabled.");
+      return;
+    }
+
+    if (regionLocator.getName().isSystemTable()) {
+      LOG.info("Region size calculation disabled for system tables.");
+      return;
+    }
+
+    LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\".");
+
+    // Get the servers which host regions of the table
+    Set<ServerName> tableServers = getRegionServersOfTable(regionLocator);
+
+    for (ServerName tableServerName : tableServers) {
+      Map<byte[], RegionLoad> regionLoads =
+          admin.getRegionLoad(tableServerName, regionLocator.getName());
+      for (RegionLoad regionLoad : regionLoads.values()) {
+
+        byte[] regionId = regionLoad.getName();
+        long regionSizeBytes = regionLoad.getStorefileSizeMB() * MEGABYTE;
+        sizeMap.put(regionId, regionSizeBytes);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
+        }
+      }
+    }
+    LOG.debug("Region sizes calculated");
+  }
+
+  private Set<ServerName> getRegionServersOfTable(RegionLocator regionLocator)
+      throws IOException {
+
+    Set<ServerName> tableServers = Sets.newHashSet();
+    for (HRegionLocation regionLocation : regionLocator.getAllRegionLocations()) {
+      tableServers.add(regionLocation.getServerName());
+    }
+    return tableServers;
+  }
+
+  boolean enabled(Configuration configuration) {
+    return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+  }
+
+  /**
+   * Returns size of given region in bytes. Returns 0 if region was not found.
+   * */
+  public long getRegionSize(byte[] regionId) {
+    Long size = sizeMap.get(regionId);
+    if (size == null) {
+      LOG.debug("Unknown region:" + Arrays.toString(regionId));
+      return 0;
+    } else {
+      return size;
+    }
+  }
+
+  public Map<byte[], Long> getRegionSizeMap() {
+    return Collections.unmodifiableMap(sizeMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
new file mode 100644
index 0000000..dff04b6
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+@InterfaceAudience.Public
+public class ResultSerialization extends Configured implements Serialization<Result> {
+  private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
+  // The following configuration property indicates import file format version.
+  public static final String IMPORT_FORMAT_VER = "hbase.import.version";
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return Result.class.isAssignableFrom(c);
+  }
+
+  @Override
+  public Deserializer<Result> getDeserializer(Class<Result> c) {
+    // check input format version
+    Configuration conf = getConf();
+    if (conf != null) {
+      String inputVersion = conf.get(IMPORT_FORMAT_VER);
+      if (inputVersion != null && inputVersion.equals("0.94")) {
+        LOG.info("Load exported file using deserializer for HBase 0.94 format");
+        return new Result94Deserializer();
+      }
+    }
+
+    return new ResultDeserializer();
+  }
+
+  @Override
+  public Serializer<Result> getSerializer(Class<Result> c) {
+    return new ResultSerializer();
+  }
+
+  /**
+   * The following deserializer class is used to load exported file of 0.94
+   */
+  private static class Result94Deserializer implements Deserializer<Result> {
+    private DataInputStream in;
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public Result deserialize(Result mutation) throws IOException {
+      int totalBuffer = in.readInt();
+      if (totalBuffer == 0) {
+        return Result.EMPTY_RESULT;
+      }
+      byte[] buf = new byte[totalBuffer];
+      readChunked(in, buf, 0, totalBuffer);
+      List<Cell> kvs = new ArrayList<>();
+      int offset = 0;
+      while (offset < totalBuffer) {
+        int keyLength = Bytes.toInt(buf, offset);
+        offset += Bytes.SIZEOF_INT;
+        kvs.add(new KeyValue(buf, offset, keyLength));
+        offset += keyLength;
+      }
+      return Result.create(kvs);
+    }
+
+    @Override
+    public void open(InputStream in) throws IOException {
+      if (!(in instanceof DataInputStream)) {
+        throw new IOException("Wrong input stream instance passed in");
+      }
+      this.in = (DataInputStream) in;
+    }
+
+    private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
+      int maxRead = 8192;
+
+      for (; ofs < len; ofs += maxRead)
+        in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
+    }
+  }
+
+  private static class ResultDeserializer implements Deserializer<Result> {
+    private InputStream in;
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public Result deserialize(Result mutation) throws IOException {
+      ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
+      return ProtobufUtil.toResult(proto);
+    }
+
+    @Override
+    public void open(InputStream in) throws IOException {
+      this.in = in;
+    }
+  }
+
+  private static class ResultSerializer implements Serializer<Result> {
+    private OutputStream out;
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+
+    @Override
+    public void open(OutputStream out) throws IOException {
+      this.out = out;
+    }
+
+    @Override
+    public void serialize(Result result) throws IOException {
+      ProtobufUtil.toResult(result).writeDelimitedTo(out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
new file mode 100644
index 0000000..2e0591e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
@@ -0,0 +1,265 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A job with a just a map phase to count rows. Map outputs table rows IF the
+ * input row has columns that have content.
+ */
+@InterfaceAudience.Public
+public class RowCounter extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(RowCounter.class);
+
+  /** Name of this 'program'. */
+  static final String NAME = "rowcounter";
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+  private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count";
+
+  /**
+   * Mapper that runs the count.
+   */
+  static class RowCounterMapper
+  extends TableMapper<ImmutableBytesWritable, Result> {
+
+    /** Counter enumeration to count the actual rows. */
+    public static enum Counters {ROWS}
+
+    /**
+     * Maps the data.
+     *
+     * @param row  The current table row key.
+     * @param values  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context)
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result values,
+      Context context)
+    throws IOException {
+      // Count every row containing data, whether it's in qualifiers or values
+      context.getCounter(Counters.ROWS).increment(1);
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    String tableName = args[0];
+    List<MultiRowRangeFilter.RowRange> rowRangeList = null;
+    long startTime = 0;
+    long endTime = 0;
+
+    StringBuilder sb = new StringBuilder();
+
+    final String rangeSwitch = "--range=";
+    final String startTimeArgKey = "--starttime=";
+    final String endTimeArgKey = "--endtime=";
+    final String expectedCountArg = "--expected-count=";
+
+    // First argument is table name, starting from second
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].startsWith(rangeSwitch)) {
+        try {
+          rowRangeList = parseRowRangeParameter(args[i], rangeSwitch);
+        } catch (IllegalArgumentException e) {
+          return null;
+        }
+        continue;
+      }
+      if (args[i].startsWith(startTimeArgKey)) {
+        startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
+        continue;
+      }
+      if (args[i].startsWith(endTimeArgKey)) {
+        endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
+        continue;
+      }
+      if (args[i].startsWith(expectedCountArg)) {
+        conf.setLong(EXPECTED_COUNT_KEY,
+            Long.parseLong(args[i].substring(expectedCountArg.length())));
+        continue;
+      }
+      // if no switch, assume column names
+      sb.append(args[i]);
+      sb.append(" ");
+    }
+    if (endTime < startTime) {
+      printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
+      return null;
+    }
+
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    job.setJarByClass(RowCounter.class);
+    Scan scan = new Scan();
+    scan.setCacheBlocks(false);
+    setScanFilter(scan, rowRangeList);
+    if (sb.length() > 0) {
+      for (String columnName : sb.toString().trim().split(" ")) {
+        String family = StringUtils.substringBefore(columnName, ":");
+        String qualifier = StringUtils.substringAfter(columnName, ":");
+
+        if (StringUtils.isBlank(qualifier)) {
+          scan.addFamily(Bytes.toBytes(family));
+        }
+        else {
+          scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
+        }
+      }
+    }
+    scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
+    job.setNumReduceTasks(0);
+    return job;
+  }
+
+  private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(
+    String arg, String rangeSwitch) {
+    final String[] ranges = arg.substring(rangeSwitch.length()).split(";");
+    final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
+    for (String range : ranges) {
+      String[] startEnd = range.split(",", 2);
+      if (startEnd.length != 2 || startEnd[1].contains(",")) {
+        printUsage("Please specify range in such format as \"--range=a,b\" " +
+            "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
+        throw new IllegalArgumentException("Wrong range specification: " + range);
+      }
+      String startKey = startEnd[0];
+      String endKey = startEnd[1];
+      rangeList.add(new MultiRowRangeFilter.RowRange(
+        Bytes.toBytesBinary(startKey), true,
+        Bytes.toBytesBinary(endKey), false));
+    }
+    return rangeList;
+  }
+
+  /**
+   * Sets filter {@link FilterBase} to the {@link Scan} instance.
+   * If provided rowRangeList contains more than one element,
+   * method sets filter which is instance of {@link MultiRowRangeFilter}.
+   * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
+   * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
+   * @param scan
+   * @param rowRangeList
+   */
+  private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
+    final int size = rowRangeList == null ? 0 : rowRangeList.size();
+    if (size <= 1) {
+      scan.setFilter(new FirstKeyOnlyFilter());
+    }
+    if (size == 1) {
+      MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
+      scan.setStartRow(range.getStartRow()); //inclusive
+      scan.setStopRow(range.getStopRow());   //exclusive
+    } else if (size > 1) {
+      scan.setFilter(new MultiRowRangeFilter(rowRangeList));
+    }
+  }
+
+  /*
+   * @param errorMessage Can attach a message when error occurs.
+   */
+  private static void printUsage(String errorMessage) {
+    System.err.println("ERROR: " + errorMessage);
+    printUsage();
+  }
+
+  /**
+   * Prints usage without error message.
+   * Note that we don't document --expected-count, because it's intended for test.
+   */
+  private static void printUsage() {
+    System.err.println("Usage: RowCounter [options] <tablename> " +
+        "[--starttime=[start] --endtime=[end] " +
+        "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
+    System.err.println("For performance consider the following options:\n"
+        + "-Dhbase.client.scanner.caching=100\n"
+        + "-Dmapreduce.map.speculative=false");
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 1) {
+      printUsage("Wrong number of parameters: " + args.length);
+      return -1;
+    }
+    Job job = createSubmittableJob(getConf(), args);
+    if (job == null) {
+      return -1;
+    }
+    boolean success = job.waitForCompletion(true);
+    final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1);
+    if (success && expectedCount != -1) {
+      final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
+      success = expectedCount == counter.getValue();
+      if (!success) {
+        LOG.error("Failing job because count of '" + counter.getValue() +
+            "' does not match expected count of '" + expectedCount + "'");
+      }
+    }
+    return (success ? 0 : 1);
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args);
+    System.exit(errCode);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
new file mode 100644
index 0000000..01a919c
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partitioner that takes start and end keys and uses bigdecimal to figure
+ * which reduce a key belongs to.  Pass the start and end
+ * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
+ * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
+ * exclusive; i.e. one larger than the biggest key in your key space.
+ * You may be surprised at how this class partitions the space; it may not
+ * align with preconceptions; e.g. a start key of zero and an end key of 100
+ * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
+ * Make your own partitioner if you need the region spacing to come out a
+ * particular way.
+ * @param <VALUE>
+ * @see #START
+ * @see #END
+ */
+@InterfaceAudience.Public
+public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
+implements Configurable {
+  private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
+
+  @Deprecated
+  public static final String START = "hbase.simpletotalorder.start";
+  @Deprecated
+  public static final String END = "hbase.simpletotalorder.end";
+
+  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
+  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
+
+  private Configuration c;
+  private byte [] startkey;
+  private byte [] endkey;
+  private byte [][] splits;
+  private int lastReduces = -1;
+
+  public static void setStartKey(Configuration conf, byte[] startKey) {
+    conf.set(START_BASE64, Base64.encodeBytes(startKey));
+  }
+
+  public static void setEndKey(Configuration conf, byte[] endKey) {
+    conf.set(END_BASE64, Base64.encodeBytes(endKey));
+  }
+
+  @SuppressWarnings("deprecation")
+  static byte[] getStartKey(Configuration conf) {
+    return getKeyFromConf(conf, START_BASE64, START);
+  }
+
+  @SuppressWarnings("deprecation")
+  static byte[] getEndKey(Configuration conf) {
+    return getKeyFromConf(conf, END_BASE64, END);
+  }
+
+  private static byte[] getKeyFromConf(Configuration conf,
+      String base64Key, String deprecatedKey) {
+    String encoded = conf.get(base64Key);
+    if (encoded != null) {
+      return Base64.decode(encoded);
+    }
+    String oldStyleVal = conf.get(deprecatedKey);
+    if (oldStyleVal == null) {
+      return null;
+    }
+    LOG.warn("Using deprecated configuration " + deprecatedKey +
+        " - please use static accessor methods instead.");
+    return Bytes.toBytesBinary(oldStyleVal);
+  }
+
+  @Override
+  public int getPartition(final ImmutableBytesWritable key, final VALUE value,
+      final int reduces) {
+    if (reduces == 1) return 0;
+    if (this.lastReduces != reduces) {
+      this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
+      for (int i = 0; i < splits.length; i++) {
+        LOG.info(Bytes.toStringBinary(splits[i]));
+      }
+      this.lastReduces = reduces;
+    }
+    int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
+      key.getLength());
+    // Below code is from hfile index search.
+    if (pos < 0) {
+      pos++;
+      pos *= -1;
+      if (pos == 0) {
+        // falls before the beginning of the file.
+        throw new RuntimeException("Key outside start/stop range: " +
+          key.toString());
+      }
+      pos--;
+    }
+    return pos;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.c;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.c = conf;
+    this.startkey = getStartKey(conf);
+    this.endkey = getEndKey(conf);
+    if (startkey == null || endkey == null) {
+      throw new RuntimeException(this.getClass() + " not configured");
+    }
+    LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
+        ", endkey=" + Bytes.toStringBinary(endkey));
+    // Reset last reduces count on change of Start / End key
+    this.lastReduces = -1;
+  }
+}


Mime
View raw message