hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jxi...@apache.org
Subject svn commit: r1570714 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/
Date Fri, 21 Feb 2014 21:29:13 GMT
Author: jxiang
Date: Fri Feb 21 21:29:12 2014
New Revision: 1570714

URL: http://svn.apache.org/r1570714
Log:
HBASE-10526 Using Cell instead of KeyValue in HFileOutputFormat

Added:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java   (with props)
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java   (with props)
Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1570714&r1=1570713&r2=1570714&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Feb 21 21:29:12 2014
@@ -19,51 +19,21 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 
 /**
  * Writes HFiles. Passed KeyValues must arrive in order.
@@ -74,237 +44,17 @@ import org.apache.hadoop.mapreduce.lib.p
  * Using this class as part of a MapReduce job is best done
  * using {@link #configureIncrementalLoad(Job, HTable)}.
  * @see KeyValueSortReducer
+ * @deprecated use {@link HFileOutputFormat2} instead.
  */
+@Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
-  static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
-  private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
-  private static final String DATABLOCK_ENCODING_CONF_KEY =
-     "hbase.mapreduce.hfileoutputformat.datablock.encoding";
-  private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
 
   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
   throws IOException, InterruptedException {
-    // Get the path of the temporary output file
-    final Path outputPath = FileOutputFormat.getOutputPath(context);
-    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
-    final Configuration conf = context.getConfiguration();
-    final FileSystem fs = outputdir.getFileSystem(conf);
-    // These configs. are from hbase-*.xml
-    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
-        HConstants.DEFAULT_MAX_FILE_SIZE);
-    // Invented config.  Add to hbase-*.xml if other than default compression.
-    final String defaultCompression = conf.get("hfile.compression",
-        Compression.Algorithm.NONE.getName());
-    final boolean compactionExclude = conf.getBoolean(
-        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
-    // create a map from column family to the compression algorithm
-    final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
-    final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
-    final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf);
-
-    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
-    final HFileDataBlockEncoder encoder;
-    if (dataBlockEncodingStr == null) {
-      encoder = NoOpDataBlockEncoder.INSTANCE;
-    } else {
-      try {
-        encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding
-            .valueOf(dataBlockEncodingStr));
-      } catch (IllegalArgumentException ex) {
-        throw new RuntimeException(
-            "Invalid data block encoding type configured for the param "
-                + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr);
-      }
-    }
-
-    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
-      // Map of families to writers and how much has been output on the writer.
-      private final Map<byte [], WriterLength> writers =
-        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
-      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
-      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
-      private boolean rollRequested = false;
-
-      public void write(ImmutableBytesWritable row, KeyValue kv)
-      throws IOException {
-        // null input == user explicitly wants to flush
-        if (row == null && kv == null) {
-          rollWriters();
-          return;
-        }
-
-        byte [] rowKey = kv.getRow();
-        long length = kv.getLength();
-        byte [] family = kv.getFamily();
-        WriterLength wl = this.writers.get(family);
-
-        // If this is a new column family, verify that the directory exists
-        if (wl == null) {
-          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
-        }
-
-        // If any of the HFiles for the column families has reached
-        // maxsize, we need to roll all the writers
-        if (wl != null && wl.written + length >= maxsize) {
-          this.rollRequested = true;
-        }
-
-        // This can only happen once a row is finished though
-        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
-          rollWriters();
-        }
-
-        // create a new HLog writer, if necessary
-        if (wl == null || wl.writer == null) {
-          wl = getNewWriter(family, conf);
-        }
-
-        // we now have the proper HLog writer. full steam ahead
-        kv.updateLatestStamp(this.now);
-        wl.writer.append(kv);
-        wl.written += length;
-
-        // Copy the row so we know when a row transition.
-        this.previousRow = rowKey;
-      }
-
-      private void rollWriters() throws IOException {
-        for (WriterLength wl : this.writers.values()) {
-          if (wl.writer != null) {
-            LOG.info("Writer=" + wl.writer.getPath() +
-                ((wl.written == 0)? "": ", wrote=" + wl.written));
-            close(wl.writer);
-          }
-          wl.writer = null;
-          wl.written = 0;
-        }
-        this.rollRequested = false;
-      }
-
-      /* Create a new StoreFile.Writer.
-       * @param family
-       * @return A WriterLength, containing a new StoreFile.Writer.
-       * @throws IOException
-       */
-      private WriterLength getNewWriter(byte[] family, Configuration conf)
-          throws IOException {
-        WriterLength wl = new WriterLength();
-        Path familydir = new Path(outputdir, Bytes.toString(family));
-        String compression = compressionMap.get(family);
-        compression = compression == null ? defaultCompression : compression;
-        String bloomTypeStr = bloomTypeMap.get(family);
-        BloomType bloomType = BloomType.NONE;
-        if (bloomTypeStr != null) {
-          bloomType = BloomType.valueOf(bloomTypeStr);
-        }
-        String blockSizeString = blockSizeMap.get(family);
-        int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE
-            : Integer.parseInt(blockSizeString);
-        Configuration tempConf = new Configuration(conf);
-        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-        wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize)
-            .withOutputDir(familydir)
-            .withCompression(AbstractHFileWriter.compressionByName(compression))
-            .withBloomType(bloomType)
-            .withComparator(KeyValue.COMPARATOR)
-            .withDataBlockEncoder(encoder)
-            .withChecksumType(HStore.getChecksumType(conf))
-            .withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
-            .build();
-
-        this.writers.put(family, wl);
-        return wl;
-      }
-
-      private void close(final StoreFile.Writer w) throws IOException {
-        if (w != null) {
-          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-              Bytes.toBytes(System.currentTimeMillis()));
-          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-              Bytes.toBytes(context.getTaskAttemptID().toString()));
-          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
-              Bytes.toBytes(true));
-          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-              Bytes.toBytes(compactionExclude));
-          w.appendTrackedTimestampsToMetadata();
-          w.close();
-        }
-      }
-
-      public void close(TaskAttemptContext c)
-      throws IOException, InterruptedException {
-        for (WriterLength wl: this.writers.values()) {
-          close(wl.writer);
-        }
-      }
-    };
-  }
-
-  /*
-   * Data structure to hold a Writer and amount of data written on it.
-   */
-  static class WriterLength {
-    long written = 0;
-    StoreFile.Writer writer = null;
-  }
-
-  /**
-   * Return the start keys of all of the regions in this table,
-   * as a list of ImmutableBytesWritable.
-   */
-  private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
-  throws IOException {
-    byte[][] byteKeys = table.getStartKeys();
-    ArrayList<ImmutableBytesWritable> ret =
-      new ArrayList<ImmutableBytesWritable>(byteKeys.length);
-    for (byte[] byteKey : byteKeys) {
-      ret.add(new ImmutableBytesWritable(byteKey));
-    }
-    return ret;
-  }
-
-  /**
-   * Write out a {@link SequenceFile} that can be read by
-   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
-   */
-  private static void writePartitions(Configuration conf, Path partitionsPath,
-      List<ImmutableBytesWritable> startKeys) throws IOException {
-    LOG.info("Writing partition information to " + partitionsPath);
-    if (startKeys.isEmpty()) {
-      throw new IllegalArgumentException("No regions passed");
-    }
-
-    // We're generating a list of split points, and we don't ever
-    // have keys < the first region (which has an empty start key)
-    // so we need to remove it. Otherwise we would end up with an
-    // empty reducer with index 0
-    TreeSet<ImmutableBytesWritable> sorted =
-      new TreeSet<ImmutableBytesWritable>(startKeys);
-
-    ImmutableBytesWritable first = sorted.first();
-    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
-      throw new IllegalArgumentException(
-          "First region of table should have empty start key. Instead has: "
-          + Bytes.toStringBinary(first.get()));
-    }
-    sorted.remove(first);
-
-    // Write the actual file
-    FileSystem fs = partitionsPath.getFileSystem(conf);
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs,
-        conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
-
-    try {
-      for (ImmutableBytesWritable startKey : sorted) {
-        writer.append(startKey, NullWritable.get());
-      }
-    } finally {
-      writer.close();
-    }
+    return HFileOutputFormat2.createRecordWriter(context);
   }
 
   /**
@@ -323,68 +73,7 @@ public class HFileOutputFormat extends F
    */
   public static void configureIncrementalLoad(Job job, HTable table)
   throws IOException {
-    Configuration conf = job.getConfiguration();
-
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(KeyValue.class);
-    job.setOutputFormatClass(HFileOutputFormat.class);
-
-    // Based on the configured map output class, set the correct reducer to properly
-    // sort the incoming values.
-    // TODO it would be nice to pick one or the other of these formats.
-    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(KeyValueSortReducer.class);
-    } else if (Put.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(PutSortReducer.class);
-    } else if (Text.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(TextSortReducer.class);
-    } else {
-      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
-    }
-
-    conf.setStrings("io.serializations", conf.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-        KeyValueSerialization.class.getName());
-
-    // Use table's region boundaries for TOP split points.
-    LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
-    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
-    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
-        "to match current region count");
-    job.setNumReduceTasks(startKeys.size());
-
-    configurePartitioner(job, startKeys);
-    // Set compression algorithms based on column families
-    configureCompression(table, conf);
-    configureBloomType(table, conf);
-    configureBlockSize(table, conf);
-
-    TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.initCredentials(job);
-    LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");
-  }
-
-  private static void configureBlockSize(HTable table, Configuration conf) throws IOException {
-    StringBuilder blockSizeConfigValue = new StringBuilder();
-    HTableDescriptor tableDescriptor = table.getTableDescriptor();
-    if(tableDescriptor == null){
-      // could happen with mock table instance
-      return;
-    }
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-    int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        blockSizeConfigValue.append('&');
-      }
-      blockSizeConfigValue.append(URLEncoder.encode(
-          familyDescriptor.getNameAsString(), "UTF-8"));
-      blockSizeConfigValue.append('=');
-      blockSizeConfigValue.append(URLEncoder.encode(
-          String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
-    }
-    // Get rid of the last ampersand
-    conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString());
+    HFileOutputFormat2.configureIncrementalLoad(job, table, HFileOutputFormat.class);
   }
 
   /**
@@ -398,41 +87,7 @@ public class HFileOutputFormat extends F
    *         algorithm
    */
   static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
-    return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
-  }
-
-  private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
-    return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
-  }
-
-  private static Map<byte[], String> createFamilyBlockSizeMap(Configuration conf) {
-    return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY);
-  }
-
-  /**
-   * Run inside the task to deserialize column family to given conf value map.
-   *
-   * @param conf
-   * @param confName
-   * @return a map of column family to the given configuration value
-   */
-  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
-    Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
-    String confVal = conf.get(confName, "");
-    for (String familyConf : confVal.split("&")) {
-      String[] familySplit = familyConf.split("=");
-      if (familySplit.length != 2) {
-        continue;
-      }
-      try {
-        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
-            URLDecoder.decode(familySplit[1], "UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        // will not happen with UTF-8 encoding
-        throw new AssertionError(e);
-      }
-    }
-    return confValMap;
+    return HFileOutputFormat2.createFamilyCompressionMap(conf);
   }
 
   /**
@@ -441,17 +96,7 @@ public class HFileOutputFormat extends F
    */
   static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
       throws IOException {
-
-    // create the partitions file
-    FileSystem fs = FileSystem.get(job.getConfiguration());
-    Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
-    fs.makeQualified(partitionsPath);
-    fs.deleteOnExit(partitionsPath);
-    writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
-
-    // configure job to use it
-    job.setPartitionerClass(TotalOrderPartitioner.class);
-    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
+    HFileOutputFormat2.configurePartitioner(job, splitPoints);
   }
 
   /**
@@ -466,24 +111,7 @@ public class HFileOutputFormat extends F
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
   static void configureCompression(HTable table, Configuration conf) throws IOException {
-    StringBuilder compressionConfigValue = new StringBuilder();
-    HTableDescriptor tableDescriptor = table.getTableDescriptor();
-    if(tableDescriptor == null){
-      // could happen with mock table instance
-      return;
-    }
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-    int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        compressionConfigValue.append('&');
-      }
-      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-      compressionConfigValue.append('=');
-      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
-    }
-    // Get rid of the last ampersand
-    conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
+    HFileOutputFormat2.configureCompression(table, conf);
   }
 
   /**
@@ -494,26 +122,6 @@ public class HFileOutputFormat extends F
    *           on failure to read column family descriptors
    */
   static void configureBloomType(HTable table, Configuration conf) throws IOException {
-    HTableDescriptor tableDescriptor = table.getTableDescriptor();
-    if (tableDescriptor == null) {
-      // could happen with mock table instance
-      return;
-    }
-    StringBuilder bloomTypeConfigValue = new StringBuilder();
-    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-    int i = 0;
-    for (HColumnDescriptor familyDescriptor : families) {
-      if (i++ > 0) {
-        bloomTypeConfigValue.append('&');
-      }
-      bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-      bloomTypeConfigValue.append('=');
-      String bloomType = familyDescriptor.getBloomFilterType().toString();
-      if (bloomType == null) {
-        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
-      }
-      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
-    }
-    conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+    HFileOutputFormat2.configureBloomType(table, conf);
   }
 }

Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java?rev=1570714&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (added)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java Fri Feb 21 21:29:12 2014
@@ -0,0 +1,535 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+
+/**
+ * Writes HFiles. Passed Cells must arrive in order.
+ * Writes current time as the sequence id for the file. Sets the major compacted
+ * attribute on created hfiles. Calling write(null,null) will forceably roll
+ * all HFiles being written.
+ * <p>
+ * Using this class as part of a MapReduce job is best done
+ * using {@link #configureIncrementalLoad(Job, HTable)}.
+ * @see KeyValueSortReducer
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+  static Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
+  static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
+  private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
+  private static final String DATABLOCK_ENCODING_CONF_KEY =
+     "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+  private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
+
+  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    return createRecordWriter(context);
+  }
+
+  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+      createRecordWriter(final TaskAttemptContext context)
+          throws IOException, InterruptedException {
+    // Get the path of the temporary output file
+    final Path outputPath = FileOutputFormat.getOutputPath(context);
+    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
+    final Configuration conf = context.getConfiguration();
+    final FileSystem fs = outputdir.getFileSystem(conf);
+    // These configs. are from hbase-*.xml
+    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+        HConstants.DEFAULT_MAX_FILE_SIZE);
+    // Invented config.  Add to hbase-*.xml if other than default compression.
+    final String defaultCompression = conf.get("hfile.compression",
+        Compression.Algorithm.NONE.getName());
+    final boolean compactionExclude = conf.getBoolean(
+        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+    // create a map from column family to the compression algorithm
+    final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+    final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
+    final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf);
+
+    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
+    final HFileDataBlockEncoder encoder;
+    if (dataBlockEncodingStr == null) {
+      encoder = NoOpDataBlockEncoder.INSTANCE;
+    } else {
+      try {
+        encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding
+            .valueOf(dataBlockEncodingStr));
+      } catch (IllegalArgumentException ex) {
+        throw new RuntimeException(
+            "Invalid data block encoding type configured for the param "
+                + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr);
+      }
+    }
+
+    return new RecordWriter<ImmutableBytesWritable, V>() {
+      // Map of families to writers and how much has been output on the writer.
+      private final Map<byte [], WriterLength> writers =
+        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
+      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+      private boolean rollRequested = false;
+
+      public void write(ImmutableBytesWritable row, V cell)
+      throws IOException {
+        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+
+        // null input == user explicitly wants to flush
+        if (row == null && kv == null) {
+          rollWriters();
+          return;
+        }
+
+        byte [] rowKey = kv.getRow();
+        long length = kv.getLength();
+        byte [] family = kv.getFamily();
+        WriterLength wl = this.writers.get(family);
+
+        // If this is a new column family, verify that the directory exists
+        if (wl == null) {
+          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+        }
+
+        // If any of the HFiles for the column families has reached
+        // maxsize, we need to roll all the writers
+        if (wl != null && wl.written + length >= maxsize) {
+          this.rollRequested = true;
+        }
+
+        // This can only happen once a row is finished though
+        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+          rollWriters();
+        }
+
+        // create a new HLog writer, if necessary
+        if (wl == null || wl.writer == null) {
+          wl = getNewWriter(family, conf);
+        }
+
+        // we now have the proper HLog writer. full steam ahead
+        kv.updateLatestStamp(this.now);
+        wl.writer.append(kv);
+        wl.written += length;
+
+        // Copy the row so we know when a row transition.
+        this.previousRow = rowKey;
+      }
+
+      private void rollWriters() throws IOException {
+        for (WriterLength wl : this.writers.values()) {
+          if (wl.writer != null) {
+            LOG.info("Writer=" + wl.writer.getPath() +
+                ((wl.written == 0)? "": ", wrote=" + wl.written));
+            close(wl.writer);
+          }
+          wl.writer = null;
+          wl.written = 0;
+        }
+        this.rollRequested = false;
+      }
+
+      /* Create a new StoreFile.Writer.
+       * @param family
+       * @return A WriterLength, containing a new StoreFile.Writer.
+       * @throws IOException
+       */
+      private WriterLength getNewWriter(byte[] family, Configuration conf)
+          throws IOException {
+        WriterLength wl = new WriterLength();
+        Path familydir = new Path(outputdir, Bytes.toString(family));
+        String compression = compressionMap.get(family);
+        compression = compression == null ? defaultCompression : compression;
+        String bloomTypeStr = bloomTypeMap.get(family);
+        BloomType bloomType = BloomType.NONE;
+        if (bloomTypeStr != null) {
+          bloomType = BloomType.valueOf(bloomTypeStr);
+        }
+        String blockSizeString = blockSizeMap.get(family);
+        int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE
+            : Integer.parseInt(blockSizeString);
+        Configuration tempConf = new Configuration(conf);
+        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+        wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize)
+            .withOutputDir(familydir)
+            .withCompression(AbstractHFileWriter.compressionByName(compression))
+            .withBloomType(bloomType)
+            .withComparator(KeyValue.COMPARATOR)
+            .withDataBlockEncoder(encoder)
+            .withChecksumType(HStore.getChecksumType(conf))
+            .withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
+            .build();
+
+        this.writers.put(family, wl);
+        return wl;
+      }
+
+      private void close(final StoreFile.Writer w) throws IOException {
+        if (w != null) {
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+              Bytes.toBytes(System.currentTimeMillis()));
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+              Bytes.toBytes(context.getTaskAttemptID().toString()));
+          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+              Bytes.toBytes(true));
+          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+              Bytes.toBytes(compactionExclude));
+          w.appendTrackedTimestampsToMetadata();
+          w.close();
+        }
+      }
+
+      public void close(TaskAttemptContext c)
+      throws IOException, InterruptedException {
+        for (WriterLength wl: this.writers.values()) {
+          close(wl.writer);
+        }
+      }
+    };
+  }
+
+  /*
+   * Data structure to hold a Writer and amount of data written on it.
+   */
+  static class WriterLength {
+    long written = 0;
+    StoreFile.Writer writer = null;
+  }
+
+  /**
+   * Return the start keys of all of the regions in this table,
+   * as a list of ImmutableBytesWritable.
+   */
+  private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
+  throws IOException {
+    byte[][] byteKeys = table.getStartKeys();
+    ArrayList<ImmutableBytesWritable> ret =
+      new ArrayList<ImmutableBytesWritable>(byteKeys.length);
+    for (byte[] byteKey : byteKeys) {
+      ret.add(new ImmutableBytesWritable(byteKey));
+    }
+    return ret;
+  }
+
+  /**
+   * Write out a {@link SequenceFile} that can be read by
+   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
+   */
+  private static void writePartitions(Configuration conf, Path partitionsPath,
+      List<ImmutableBytesWritable> startKeys) throws IOException {
+    LOG.info("Writing partition information to " + partitionsPath);
+    if (startKeys.isEmpty()) {
+      throw new IllegalArgumentException("No regions passed");
+    }
+
+    // We're generating a list of split points, and we don't ever
+    // have keys < the first region (which has an empty start key)
+    // so we need to remove it. Otherwise we would end up with an
+    // empty reducer with index 0
+    TreeSet<ImmutableBytesWritable> sorted =
+      new TreeSet<ImmutableBytesWritable>(startKeys);
+
+    ImmutableBytesWritable first = sorted.first();
+    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+      throw new IllegalArgumentException(
+          "First region of table should have empty start key. Instead has: "
+          + Bytes.toStringBinary(first.get()));
+    }
+    sorted.remove(first);
+
+    // Write the actual file
+    FileSystem fs = partitionsPath.getFileSystem(conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs,
+        conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
+
+    try {
+      for (ImmutableBytesWritable startKey : sorted) {
+        writer.append(startKey, NullWritable.get());
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Configure a MapReduce Job to perform an incremental load into the given
+   * table. This
+   * <ul>
+   *   <li>Inspects the table to configure a total order partitioner</li>
+   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
+   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
+   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
+   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+   *     PutSortReducer)</li>
+   * </ul>
+   * The user should be sure to set the map output value class to either KeyValue or Put before
+   * running this function.
+   */
+  public static void configureIncrementalLoad(Job job, HTable table)
+  throws IOException {
+    configureIncrementalLoad(job, table, HFileOutputFormat2.class);
+  }
+
+  static void configureIncrementalLoad(Job job, HTable table,
+      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
+    Configuration conf = job.getConfiguration();
+
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(HFileOutputFormat2.class);
+
+    // Based on the configured map output class, set the correct reducer to properly
+    // sort the incoming values.
+    // TODO it would be nice to pick one or the other of these formats.
+    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(KeyValueSortReducer.class);
+    } else if (Put.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(PutSortReducer.class);
+    } else if (Text.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(TextSortReducer.class);
+    } else {
+      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+    }
+
+    conf.setStrings("io.serializations", conf.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+        KeyValueSerialization.class.getName());
+
+    // Use table's region boundaries for TOP split points.
+    LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
+    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
+    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+        "to match current region count");
+    job.setNumReduceTasks(startKeys.size());
+
+    configurePartitioner(job, startKeys);
+    // Set compression algorithms based on column families
+    configureCompression(table, conf);
+    configureBloomType(table, conf);
+    configureBlockSize(table, conf);
+
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.initCredentials(job);
+    LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");
+  }
+
+  private static void configureBlockSize(HTable table, Configuration conf) throws IOException {
+    StringBuilder blockSizeConfigValue = new StringBuilder();
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if(tableDescriptor == null){
+      // could happen with mock table instance
+      return;
+    }
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        blockSizeConfigValue.append('&');
+      }
+      blockSizeConfigValue.append(URLEncoder.encode(
+          familyDescriptor.getNameAsString(), "UTF-8"));
+      blockSizeConfigValue.append('=');
+      blockSizeConfigValue.append(URLEncoder.encode(
+          String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
+    }
+    // Get rid of the last ampersand
+    conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString());
+  }
+
+  /**
+   * Run inside the task to deserialize column family to compression algorithm
+   * map from the
+   * configuration.
+   *
+   * Package-private for unit tests only.
+   *
+   * @return a map from column family to the name of the configured compression
+   *         algorithm
+   */
+  static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
+    return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+  }
+
+  private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+    return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+  }
+
+  private static Map<byte[], String> createFamilyBlockSizeMap(Configuration conf) {
+    return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY);
+  }
+
+  /**
+   * Run inside the task to deserialize column family to given conf value map.
+   *
+   * @param conf
+   * @param confName
+   * @return a map of column family to the given configuration value
+   */
+  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+    Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+    String confVal = conf.get(confName, "");
+    for (String familyConf : confVal.split("&")) {
+      String[] familySplit = familyConf.split("=");
+      if (familySplit.length != 2) {
+        continue;
+      }
+      try {
+        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+            URLDecoder.decode(familySplit[1], "UTF-8"));
+      } catch (UnsupportedEncodingException e) {
+        // will not happen with UTF-8 encoding
+        throw new AssertionError(e);
+      }
+    }
+    return confValMap;
+  }
+
+  /**
+   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
+   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
+   */
+  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
+      throws IOException {
+
+    // create the partitions file
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+    Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
+    fs.makeQualified(partitionsPath);
+    fs.deleteOnExit(partitionsPath);
+    writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
+
+    // configure job to use it
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
+  }
+
+  /**
+   * Serialize column family to compression algorithm map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * Package-private for unit tests only.
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+  static void configureCompression(HTable table, Configuration conf) throws IOException {
+    StringBuilder compressionConfigValue = new StringBuilder();
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if(tableDescriptor == null){
+      // could happen with mock table instance
+      return;
+    }
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        compressionConfigValue.append('&');
+      }
+      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      compressionConfigValue.append('=');
+      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
+    }
+    // Get rid of the last ampersand
+    conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
+  }
+
+  /**
+   * Serialize column family to bloom type map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  static void configureBloomType(HTable table, Configuration conf) throws IOException {
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if (tableDescriptor == null) {
+      // could happen with mock table instance
+      return;
+    }
+    StringBuilder bloomTypeConfigValue = new StringBuilder();
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        bloomTypeConfigValue.append('&');
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      bloomTypeConfigValue.append('=');
+      String bloomType = familyDescriptor.getBloomFilterType().toString();
+      if (bloomType == null) {
+        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+    }
+    conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+  }
+}

Propchange: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java?rev=1570714&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java Fri Feb 21 21:29:12 2014
@@ -0,0 +1,872 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HadoopShims;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat2}.
+ * Sets up and runs a mapreduce job that writes hfile output.
+ * Creates a few inner classes to implement splits and an inputformat that
+ * emits keys and values like those of {@link PerformanceEvaluation}.
+ */
+@Category(LargeTests.class)
+public class TestHFileOutputFormat2  {
+  private final static int ROWSPERSPLIT = 1024;
+
+  private static final byte[][] FAMILIES
+    = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
+      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+  private static final TableName TABLE_NAME =
+      TableName.valueOf("TestTable");
+
+  private HBaseTestingUtility util = new HBaseTestingUtility();
+
+  private static Log LOG = LogFactory.getLog(TestHFileOutputFormat2.class);
+
+  /**
+   * Simple mapper that makes KeyValue output.
+   */
+  static class RandomKVGeneratingMapper
+  extends Mapper<NullWritable, NullWritable,
+                 ImmutableBytesWritable, KeyValue> {
+
+    private int keyLength;
+    private static final int KEYLEN_DEFAULT=10;
+    private static final String KEYLEN_CONF="randomkv.key.length";
+
+    private int valLength;
+    private static final int VALLEN_DEFAULT=10;
+    private static final String VALLEN_CONF="randomkv.val.length";
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+
+      Configuration conf = context.getConfiguration();
+      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+    }
+
+    protected void map(
+        NullWritable n1, NullWritable n2,
+        Mapper<NullWritable, NullWritable,
+               ImmutableBytesWritable,KeyValue>.Context context)
+        throws java.io.IOException ,InterruptedException
+    {
+
+      byte keyBytes[] = new byte[keyLength];
+      byte valBytes[] = new byte[valLength];
+
+      int taskId = context.getTaskAttemptID().getTaskID().getId();
+      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+      Random random = new Random();
+      for (int i = 0; i < ROWSPERSPLIT; i++) {
+
+        random.nextBytes(keyBytes);
+        // Ensure that unique tasks generate unique keys
+        keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
+        random.nextBytes(valBytes);
+        ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+        for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+          KeyValue kv = new KeyValue(keyBytes, family,
+              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          context.write(key, kv);
+        }
+      }
+    }
+  }
+
+  private void setupRandomGeneratorMapper(Job job) {
+    job.setInputFormatClass(NMapInputFormat.class);
+    job.setMapperClass(RandomKVGeneratingMapper.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(KeyValue.class);
+  }
+
+  /**
+   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
+   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
+   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
+   */
+  @Test
+  public void test_LATEST_TIMESTAMP_isReplaced()
+  throws Exception {
+    Configuration conf = new Configuration(this.util.getConfiguration());
+    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
+    TaskAttemptContext context = null;
+    Path dir =
+      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
+    try {
+      Job job = new Job(conf);
+      FileOutputFormat.setOutputPath(job, dir);
+      context = createTestTaskAttemptContext(job);
+      HFileOutputFormat2 hof = new HFileOutputFormat2();
+      writer = hof.getRecordWriter(context);
+      final byte [] b = Bytes.toBytes("b");
+
+      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
+      // changed by call to write.  Check all in kv is same but ts.
+      KeyValue kv = new KeyValue(b, b, b);
+      KeyValue original = kv.clone();
+      writer.write(new ImmutableBytesWritable(), kv);
+      assertFalse(original.equals(kv));
+      assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
+      assertTrue(original.matchingColumn(kv.getFamily(), kv.getQualifier()));
+      assertNotSame(original.getTimestamp(), kv.getTimestamp());
+      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
+
+      // Test 2. Now test passing a kv that has explicit ts.  It should not be
+      // changed by call to record write.
+      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
+      original = kv.clone();
+      writer.write(new ImmutableBytesWritable(), kv);
+      assertTrue(original.equals(kv));
+    } finally {
+      if (writer != null && context != null) writer.close(context);
+      dir.getFileSystem(conf).delete(dir, true);
+    }
+  }
+
+  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
+  throws IOException, Exception {
+    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
+    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0");
+    return context;
+  }
+
+  /*
+   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
+   * metadata used by time-restricted scans.
+   */
+  @Test
+  public void test_TIMERANGE() throws Exception {
+    Configuration conf = new Configuration(this.util.getConfiguration());
+    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
+    TaskAttemptContext context = null;
+    Path dir =
+      util.getDataTestDir("test_TIMERANGE_present");
+    LOG.info("Timerange dir writing to dir: "+ dir);
+    try {
+      // build a record writer using HFileOutputFormat2
+      Job job = new Job(conf);
+      FileOutputFormat.setOutputPath(job, dir);
+      context = createTestTaskAttemptContext(job);
+      HFileOutputFormat2 hof = new HFileOutputFormat2();
+      writer = hof.getRecordWriter(context);
+
+      // Pass two key values with explicit times stamps
+      final byte [] b = Bytes.toBytes("b");
+
+      // value 1 with timestamp 2000
+      KeyValue kv = new KeyValue(b, b, b, 2000, b);
+      KeyValue original = kv.clone();
+      writer.write(new ImmutableBytesWritable(), kv);
+      assertEquals(original,kv);
+
+      // value 2 with timestamp 1000
+      kv = new KeyValue(b, b, b, 1000, b);
+      original = kv.clone();
+      writer.write(new ImmutableBytesWritable(), kv);
+      assertEquals(original, kv);
+
+      // verify that the file has the proper FileInfo.
+      writer.close(context);
+
+      // the generated file lives 1 directory down from the attempt directory
+      // and is the only file, e.g.
+      // _attempt__0000_r_000000_0/b/1979617994050536795
+      FileSystem fs = FileSystem.get(conf);
+      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
+      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
+      FileStatus[] file = fs.listStatus(sub1[0].getPath());
+
+      // open as HFile Reader and pull out TIMERANGE FileInfo.
+      HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
+          new CacheConfig(conf));
+      Map<byte[],byte[]> finfo = rd.loadFileInfo();
+      byte[] range = finfo.get("TIMERANGE".getBytes());
+      assertNotNull(range);
+
+      // unmarshall and check values.
+      TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+      Writables.copyWritable(range, timeRangeTracker);
+      LOG.info(timeRangeTracker.getMinimumTimestamp() +
+          "...." + timeRangeTracker.getMaximumTimestamp());
+      assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
+      assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
+      rd.close();
+    } finally {
+      if (writer != null && context != null) writer.close(context);
+      dir.getFileSystem(conf).delete(dir, true);
+    }
+  }
+
+  /**
+   * Run small MR job.
+   */
+  @Test
+  public void testWritingPEData() throws Exception {
+    Configuration conf = util.getConfiguration();
+    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    // Set down this value or we OOME in eclipse.
+    conf.setInt("io.sort.mb", 20);
+    // Write a few files.
+    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+
+    Job job = new Job(conf, "testWritingPEData");
+    setupRandomGeneratorMapper(job);
+    // This partitioner doesn't work well for number keys but using it anyways
+    // just to demonstrate how to configure it.
+    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
+    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
+
+    Arrays.fill(startKey, (byte)0);
+    Arrays.fill(endKey, (byte)0xff);
+
+    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
+    // Set start and end rows for partitioner.
+    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
+    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
+    job.setReducerClass(KeyValueSortReducer.class);
+    job.setOutputFormatClass(HFileOutputFormat2.class);
+    job.setNumReduceTasks(4);
+    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+        KeyValueSerialization.class.getName());
+
+    FileOutputFormat.setOutputPath(job, testDir);
+    assertTrue(job.waitForCompletion(false));
+    FileStatus [] files = fs.listStatus(testDir);
+    assertTrue(files.length > 0);
+  }
+
+  @Test
+  public void testJobConfiguration() throws Exception {
+    Job job = new Job(util.getConfiguration());
+    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
+    HTable table = Mockito.mock(HTable.class);
+    setupMockStartKeys(table);
+    HFileOutputFormat2.configureIncrementalLoad(job, table);
+    assertEquals(job.getNumReduceTasks(), 4);
+  }
+
+  private byte [][] generateRandomStartKeys(int numKeys) {
+    Random random = new Random();
+    byte[][] ret = new byte[numKeys][];
+    // first region start key is always empty
+    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
+    for (int i = 1; i < numKeys; i++) {
+      ret[i] = PerformanceEvaluation.generateValue(random);
+    }
+    return ret;
+  }
+
+  @Test
+  public void testMRIncrementalLoad() throws Exception {
+    LOG.info("\nStarting test testMRIncrementalLoad\n");
+    doIncrementalLoadTest(false);
+  }
+
+  @Test
+  public void testMRIncrementalLoadWithSplit() throws Exception {
+    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
+    doIncrementalLoadTest(true);
+  }
+
+  private void doIncrementalLoadTest(
+      boolean shouldChangeRegions) throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    byte[][] startKeys = generateRandomStartKeys(5);
+    HBaseAdmin admin = null;
+    try {
+      util.startMiniCluster();
+      Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+      admin = new HBaseAdmin(conf);
+      HTable table = util.createTable(TABLE_NAME, FAMILIES);
+      assertEquals("Should start with empty table",
+          0, util.countRows(table));
+      int numRegions = util.createMultiRegions(
+          util.getConfiguration(), table, FAMILIES[0], startKeys);
+      assertEquals("Should make 5 regions", numRegions, 5);
+
+      // Generate the bulk load files
+      util.startMiniMapReduceCluster();
+      runIncrementalPELoad(conf, table, testDir);
+      // This doesn't write into the table, just makes files
+      assertEquals("HFOF should not touch actual table",
+          0, util.countRows(table));
+
+
+      // Make sure that a directory was created for every CF
+      int dir = 0;
+      for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+        for (byte[] family : FAMILIES) {
+          if (Bytes.toString(family).equals(f.getPath().getName())) {
+            ++dir;
+          }
+        }
+      }
+      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+
+      // handle the split case
+      if (shouldChangeRegions) {
+        LOG.info("Changing regions in table");
+        admin.disableTable(table.getTableName());
+        while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
+            getRegionStates().isRegionsInTransition()) {
+          Threads.sleep(200);
+          LOG.info("Waiting on table to finish disabling");
+        }
+        byte[][] newStartKeys = generateRandomStartKeys(15);
+        util.createMultiRegions(
+            util.getConfiguration(), table, FAMILIES[0], newStartKeys);
+        admin.enableTable(table.getTableName());
+        while (table.getRegionLocations().size() != 15 ||
+            !admin.isTableAvailable(table.getTableName())) {
+          Thread.sleep(200);
+          LOG.info("Waiting for new region assignment to happen");
+        }
+      }
+
+      // Perform the actual load
+      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+
+      // Ensure data shows up
+      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+      assertEquals("LoadIncrementalHFiles should put expected data in table",
+          expectedRows, util.countRows(table));
+      Scan scan = new Scan();
+      ResultScanner results = table.getScanner(scan);
+      for (Result res : results) {
+        assertEquals(FAMILIES.length, res.rawCells().length);
+        Cell first = res.rawCells()[0];
+        for (Cell kv : res.rawCells()) {
+          assertTrue(CellUtil.matchingRow(first, kv));
+          assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+        }
+      }
+      results.close();
+      String tableDigestBefore = util.checksumRows(table);
+
+      // Cause regions to reopen
+      admin.disableTable(TABLE_NAME);
+      while (!admin.isTableDisabled(TABLE_NAME)) {
+        Thread.sleep(200);
+        LOG.info("Waiting for table to disable");
+      }
+      admin.enableTable(TABLE_NAME);
+      util.waitTableAvailable(TABLE_NAME.getName());
+      assertEquals("Data should remain after reopening of regions",
+          tableDigestBefore, util.checksumRows(table));
+    } finally {
+      if (admin != null) admin.close();
+      util.shutdownMiniMapReduceCluster();
+      util.shutdownMiniCluster();
+    }
+  }
+
+  private void runIncrementalPELoad(
+      Configuration conf, HTable table, Path outDir)
+  throws Exception {
+    Job job = new Job(conf, "testLocalMRIncrementalLoad");
+    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
+    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+        KeyValueSerialization.class.getName());
+    setupRandomGeneratorMapper(job);
+    HFileOutputFormat2.configureIncrementalLoad(job, table);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    assertFalse( util.getTestFileSystem().exists(outDir)) ;
+
+    assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks());
+
+    assertTrue(job.waitForCompletion(true));
+  }
+
+  /**
+   * Test for
+   * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests
+   * that the compression map is correctly deserialized from configuration
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateFamilyCompressionMap() throws IOException {
+    for (int numCfs = 0; numCfs <= 3; numCfs++) {
+      Configuration conf = new Configuration(this.util.getConfiguration());
+      Map<String, Compression.Algorithm> familyToCompression = getMockColumnFamilies(numCfs);
+      HTable table = Mockito.mock(HTable.class);
+      setupMockColumnFamilies(table, familyToCompression);
+      HFileOutputFormat2.configureCompression(table, conf);
+
+      // read back family specific compression setting from the configuration
+      Map<byte[], String> retrievedFamilyToCompressionMap = HFileOutputFormat2.createFamilyCompressionMap(conf);
+
+      // test that we have a value for all column families that matches with the
+      // used mock values
+      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
+        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue()
+                     .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
+      }
+    }
+  }
+
+  private void setupMockColumnFamilies(HTable table,
+    Map<String, Compression.Algorithm> familyToCompression) throws IOException
+  {
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
+      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
+          .setMaxVersions(1)
+          .setCompressionType(entry.getValue())
+          .setBlockCacheEnabled(false)
+          .setTimeToLive(0));
+    }
+    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+  }
+
+  private void setupMockStartKeys(HTable table) throws IOException {
+    byte[][] mockKeys = new byte[][] {
+        HConstants.EMPTY_BYTE_ARRAY,
+        Bytes.toBytes("aaa"),
+        Bytes.toBytes("ggg"),
+        Bytes.toBytes("zzz")
+    };
+    Mockito.doReturn(mockKeys).when(table).getStartKeys();
+  }
+
+  /**
+   * @return a map from column family names to compression algorithms for
+   *         testing column family compression. Column family names have special characters
+   */
+  private Map<String, Compression.Algorithm> getMockColumnFamilies(int numCfs) {
+    Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
+    // use column family names having special characters
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
+    }
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
+    }
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
+    }
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family3", Compression.Algorithm.NONE);
+    }
+    return familyToCompression;
+  }
+
+  /**
+   * Test that {@link HFileOutputFormat2} RecordWriter uses compression settings
+   * from the column family descriptor
+   */
+  @Test
+  public void testColumnFamilyCompression() throws Exception {
+    Configuration conf = new Configuration(this.util.getConfiguration());
+    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
+    TaskAttemptContext context = null;
+    Path dir =
+        util.getDataTestDirOnTestFS("testColumnFamilyCompression");
+
+    HTable table = Mockito.mock(HTable.class);
+
+    Map<String, Compression.Algorithm> configuredCompression =
+      new HashMap<String, Compression.Algorithm>();
+    Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms();
+
+    int familyIndex = 0;
+    for (byte[] family : FAMILIES) {
+      configuredCompression.put(Bytes.toString(family),
+                                supportedAlgos[familyIndex++ % supportedAlgos.length]);
+    }
+    setupMockColumnFamilies(table, configuredCompression);
+
+    // set up the table to return some mock keys
+    setupMockStartKeys(table);
+
+    try {
+      // partial map red setup to get an operational writer for testing
+      // We turn off the sequence file compression, because DefaultCodec
+      // pollutes the GZip codec pool with an incompatible compressor.
+      conf.set("io.seqfile.compression.type", "NONE");
+      Job job = new Job(conf, "testLocalMRIncrementalLoad");
+      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilyCompression"));
+      setupRandomGeneratorMapper(job);
+      HFileOutputFormat2.configureIncrementalLoad(job, table);
+      FileOutputFormat.setOutputPath(job, dir);
+      context = createTestTaskAttemptContext(job);
+      HFileOutputFormat2 hof = new HFileOutputFormat2();
+      writer = hof.getRecordWriter(context);
+
+      // write out random rows
+      writeRandomKeyValues(writer, context, ROWSPERSPLIT);
+      writer.close(context);
+
+      // Make sure that a directory was created for every CF
+      FileSystem fileSystem = dir.getFileSystem(conf);
+
+      // commit so that the filesystem has one directory per column family
+      hof.getOutputCommitter(context).commitTask(context);
+      hof.getOutputCommitter(context).commitJob(context);
+      for (byte[] family : FAMILIES) {
+        String familyStr = new String(family);
+        boolean found = false;
+        for (FileStatus f : fileSystem.listStatus(dir)) {
+
+          if (Bytes.toString(family).equals(f.getPath().getName())) {
+            // we found a matching directory
+            found = true;
+
+            // verify that the compression on this file matches the configured
+            // compression
+            Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
+            Reader reader = HFile.createReader(fileSystem, dataFilePath,
+                new CacheConfig(conf));
+            reader.loadFileInfo();
+            assertEquals("Incorrect compression used for column family " + familyStr
+                         + "(reader: " + reader + ")",
+                         configuredCompression.get(familyStr), reader.getCompressionAlgorithm());
+            break;
+          }
+        }
+
+        if (!found) {
+          fail("HFile for column family " + familyStr + " not found");
+        }
+      }
+
+    } finally {
+      dir.getFileSystem(conf).delete(dir, true);
+    }
+  }
+
+
+  /**
+   * @return
+   */
+  private Compression.Algorithm[] getSupportedCompressionAlgorithms() {
+    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
+    List<Compression.Algorithm> supportedAlgos = Lists.newArrayList();
+
+    for (String algoName : allAlgos) {
+      try {
+        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
+        algo.getCompressor();
+        supportedAlgos.add(algo);
+      } catch (Throwable t) {
+        // this algo is not available
+      }
+    }
+
+    return supportedAlgos.toArray(new Compression.Algorithm[0]);
+  }
+
+
+  /**
+   * Write random values to the writer assuming a table created using
+   * {@link #FAMILIES} as column family descriptors
+   */
+  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, TaskAttemptContext context,
+      int numRows)
+      throws IOException, InterruptedException {
+    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
+    int valLength = 10;
+    byte valBytes[] = new byte[valLength];
+
+    int taskId = context.getTaskAttemptID().getTaskID().getId();
+    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+    Random random = new Random();
+    for (int i = 0; i < numRows; i++) {
+
+      Bytes.putInt(keyBytes, 0, i);
+      random.nextBytes(valBytes);
+      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+      for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+        KeyValue kv = new KeyValue(keyBytes, family,
+            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        writer.write(key, kv);
+      }
+    }
+  }
+
+  /**
+   * This test is to test the scenario happened in HBASE-6901.
+   * All files are bulk loaded and excluded from minor compaction.
+   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
+   * will be thrown.
+   */
+  @Ignore ("Flakey: See HBASE-9051") @Test
+  public void testExcludeAllFromMinorCompaction() throws Exception {
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hbase.hstore.compaction.min", 2);
+    generateRandomStartKeys(5);
+
+    try {
+      util.startMiniCluster();
+      final FileSystem fs = util.getDFSCluster().getFileSystem();
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      HTable table = util.createTable(TABLE_NAME, FAMILIES);
+      assertEquals("Should start with empty table", 0, util.countRows(table));
+
+      // deep inspection: get the StoreFile dir
+      final Path storePath = HStore.getStoreHomedir(
+          FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
+          admin.getTableRegions(TABLE_NAME).get(0),
+          FAMILIES[0]);
+      assertEquals(0, fs.listStatus(storePath).length);
+
+      // Generate two bulk load files
+      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
+          true);
+      util.startMiniMapReduceCluster();
+
+      for (int i = 0; i < 2; i++) {
+        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
+        runIncrementalPELoad(conf, table, testDir);
+        // Perform the actual load
+        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+      }
+
+      // Ensure data shows up
+      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+      assertEquals("LoadIncrementalHFiles should put expected data in table",
+          expectedRows, util.countRows(table));
+
+      // should have a second StoreFile now
+      assertEquals(2, fs.listStatus(storePath).length);
+
+      // minor compactions shouldn't get rid of the file
+      admin.compact(TABLE_NAME.getName());
+      try {
+        quickPoll(new Callable<Boolean>() {
+          public Boolean call() throws Exception {
+            return fs.listStatus(storePath).length == 1;
+          }
+        }, 5000);
+        throw new IOException("SF# = " + fs.listStatus(storePath).length);
+      } catch (AssertionError ae) {
+        // this is expected behavior
+      }
+
+      // a major compaction should work though
+      admin.majorCompact(TABLE_NAME.getName());
+      quickPoll(new Callable<Boolean>() {
+        public Boolean call() throws Exception {
+          return fs.listStatus(storePath).length == 1;
+        }
+      }, 5000);
+
+    } finally {
+      util.shutdownMiniMapReduceCluster();
+      util.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testExcludeMinorCompaction() throws Exception {
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hbase.hstore.compaction.min", 2);
+    generateRandomStartKeys(5);
+
+    try {
+      util.startMiniCluster();
+      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
+      final FileSystem fs = util.getDFSCluster().getFileSystem();
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      HTable table = util.createTable(TABLE_NAME, FAMILIES);
+      assertEquals("Should start with empty table", 0, util.countRows(table));
+
+      // deep inspection: get the StoreFile dir
+      final Path storePath = HStore.getStoreHomedir(
+          FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
+          admin.getTableRegions(TABLE_NAME).get(0),
+          FAMILIES[0]);
+      assertEquals(0, fs.listStatus(storePath).length);
+
+      // put some data in it and flush to create a storefile
+      Put p = new Put(Bytes.toBytes("test"));
+      p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
+      table.put(p);
+      admin.flush(TABLE_NAME.getName());
+      assertEquals(1, util.countRows(table));
+      quickPoll(new Callable<Boolean>() {
+        public Boolean call() throws Exception {
+          return fs.listStatus(storePath).length == 1;
+        }
+      }, 5000);
+
+      // Generate a bulk load file with more rows
+      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
+          true);
+      util.startMiniMapReduceCluster();
+      runIncrementalPELoad(conf, table, testDir);
+
+      // Perform the actual load
+      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+
+      // Ensure data shows up
+      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+      assertEquals("LoadIncrementalHFiles should put expected data in table",
+          expectedRows + 1, util.countRows(table));
+
+      // should have a second StoreFile now
+      assertEquals(2, fs.listStatus(storePath).length);
+
+      // minor compactions shouldn't get rid of the file
+      admin.compact(TABLE_NAME.getName());
+      try {
+        quickPoll(new Callable<Boolean>() {
+          public Boolean call() throws Exception {
+            return fs.listStatus(storePath).length == 1;
+          }
+        }, 5000);
+        throw new IOException("SF# = " + fs.listStatus(storePath).length);
+      } catch (AssertionError ae) {
+        // this is expected behavior
+      }
+
+      // a major compaction should work though
+      admin.majorCompact(TABLE_NAME.getName());
+      quickPoll(new Callable<Boolean>() {
+        public Boolean call() throws Exception {
+          return fs.listStatus(storePath).length == 1;
+        }
+      }, 5000);
+
+    } finally {
+      util.shutdownMiniMapReduceCluster();
+      util.shutdownMiniCluster();
+    }
+  }
+
+  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
+    int sleepMs = 10;
+    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
+    while (retries-- > 0) {
+      if (c.call().booleanValue()) {
+        return;
+      }
+      Thread.sleep(sleepMs);
+    }
+    fail();
+  }
+
+  public static void main(String args[]) throws Exception {
+    new TestHFileOutputFormat2().manualTest(args);
+  }
+
+  public void manualTest(String args[]) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    util = new HBaseTestingUtility(conf);
+    if ("newtable".equals(args[0])) {
+      byte[] tname = args[1].getBytes();
+      HTable table = util.createTable(tname, FAMILIES);
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      admin.disableTable(tname);
+      byte[][] startKeys = generateRandomStartKeys(5);
+      util.createMultiRegions(conf, table, FAMILIES[0], startKeys);
+      admin.enableTable(tname);
+    } else if ("incremental".equals(args[0])) {
+      byte[] tname = args[1].getBytes();
+      HTable table = new HTable(conf, tname);
+      Path outDir = new Path("incremental-out");
+      runIncrementalPELoad(conf, table, outDir);
+    } else {
+      throw new RuntimeException(
+          "usage: TestHFileOutputFormat2 newtable | incremental");
+    }
+  }
+
+}
+

Propchange: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message