Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 821A5200CAD for ; Wed, 28 Jun 2017 17:01:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 809ED160BFF; Wed, 28 Jun 2017 15:01:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2A9FB160BE8 for ; Wed, 28 Jun 2017 17:01:12 +0200 (CEST) Received: (qmail 65078 invoked by uid 500); 28 Jun 2017 15:01:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 64068 invoked by uid 99); 28 Jun 2017 15:01:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 15:01:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4FCBF3243; Wed, 28 Jun 2017 15:01:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 28 Jun 2017 15:01:25 -0000 Message-Id: <9ca9bf8d8d2c43e193947bd8c96acf58@git.apache.org> In-Reply-To: <4a3527e9985f440e86627a5c1aa5b2df@git.apache.org> References: <4a3527e9985f440e86627a5c1aa5b2df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/38] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Wed, 28 Jun 2017 15:01:14 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/2848f431/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.HFileRecordWriter.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.HFileRecordWriter.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.HFileRecordWriter.html deleted file mode 100644 index e6ae33a..0000000 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.HFileRecordWriter.html +++ /dev/null @@ -1,913 +0,0 @@ - - - -Source code - - - -
-
001/**
-002 * Licensed to the Apache Software Foundation (ASF) under one
-003 * or more contributor license agreements.  See the NOTICE file
-004 * distributed with this work for additional information
-005 * regarding copyright ownership.  The ASF licenses this file
-006 * to you under the Apache License, Version 2.0 (the
-007 * "License"); you may not use this file except in compliance
-008 * with the License.  You may obtain a copy of the License at
-009 *
-010 *     http://www.apache.org/licenses/LICENSE-2.0
-011 *
-012 * Unless required by applicable law or agreed to in writing, software
-013 * distributed under the License is distributed on an "AS IS" BASIS,
-014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-015 * See the License for the specific language governing permissions and
-016 * limitations under the License.
-017 */
-018package org.apache.hadoop.hbase.mapreduce;
-019
-020import java.io.IOException;
-021import java.io.UnsupportedEncodingException;
-022import java.net.InetSocketAddress;
-023import java.net.URLDecoder;
-024import java.net.URLEncoder;
-025import java.util.ArrayList;
-026import java.util.Collection;
-027import java.util.List;
-028import java.util.Map;
-029import java.util.TreeMap;
-030import java.util.TreeSet;
-031import java.util.UUID;
-032
-033import org.apache.commons.logging.Log;
-034import org.apache.commons.logging.LogFactory;
-035import org.apache.hadoop.conf.Configuration;
-036import org.apache.hadoop.fs.FileSystem;
-037import org.apache.hadoop.fs.Path;
-038import org.apache.hadoop.hbase.Cell;
-039import org.apache.hadoop.hbase.CellComparator;
-040import org.apache.hadoop.hbase.CellUtil;
-041import org.apache.hadoop.hbase.HColumnDescriptor;
-042import org.apache.hadoop.hbase.HConstants;
-043import org.apache.hadoop.hbase.HRegionLocation;
-044import org.apache.hadoop.hbase.HTableDescriptor;
-045import org.apache.hadoop.hbase.KeyValue;
-046import org.apache.hadoop.hbase.KeyValueUtil;
-047import org.apache.hadoop.hbase.TableName;
-048import org.apache.hadoop.hbase.classification.InterfaceAudience;
-049import org.apache.hadoop.hbase.client.Connection;
-050import org.apache.hadoop.hbase.client.ConnectionFactory;
-051import org.apache.hadoop.hbase.client.Put;
-052import org.apache.hadoop.hbase.client.RegionLocator;
-053import org.apache.hadoop.hbase.client.Table;
-054import org.apache.hadoop.hbase.fs.HFileSystem;
-055import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-056import org.apache.hadoop.hbase.io.compress.Compression;
-057import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-060import org.apache.hadoop.hbase.io.hfile.HFile;
-061import org.apache.hadoop.hbase.io.hfile.HFileContext;
-062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-063import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
-064import org.apache.hadoop.hbase.regionserver.BloomType;
-065import org.apache.hadoop.hbase.regionserver.HStore;
-066import org.apache.hadoop.hbase.regionserver.StoreFile;
-067import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
-068import org.apache.hadoop.hbase.util.Bytes;
-069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-070import org.apache.hadoop.hbase.util.FSUtils;
-071import org.apache.hadoop.io.NullWritable;
-072import org.apache.hadoop.io.SequenceFile;
-073import org.apache.hadoop.io.Text;
-074import org.apache.hadoop.mapreduce.Job;
-075import org.apache.hadoop.mapreduce.OutputFormat;
-076import org.apache.hadoop.mapreduce.RecordWriter;
-077import org.apache.hadoop.mapreduce.TaskAttemptContext;
-078import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-079import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-080import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-081
-082import com.google.common.annotations.VisibleForTesting;
-083
-084/**
-085 * Writes HFiles. Passed Cells must arrive in order.
-086 * Writes current time as the sequence id for the file. Sets the major compacted
-087 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
-088 * all HFiles being written.
-089 * <p>
-090 * Using this class as part of a MapReduce job is best done
-091 * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}.
-092 */
-093@InterfaceAudience.Public
-094public class HFileOutputFormat2
-095    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
-096  private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
-097
-098  // The following constants are private since these are used by
-099  // HFileOutputFormat2 to internally transfer data between job setup and
-100  // reducer run using conf.
-101  // These should not be changed by the client.
-102  private static final String COMPRESSION_FAMILIES_CONF_KEY =
-103      "hbase.hfileoutputformat.families.compression";
-104  private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
-105      "hbase.hfileoutputformat.families.bloomtype";
-106  private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
-107      "hbase.mapreduce.hfileoutputformat.blocksize";
-108  private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
-109      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
-110
-111  // This constant is public since the client can modify this when setting
-112  // up their conf object and thus refer to this symbol.
-113  // It is present for backwards compatibility reasons. Use it only to
-114  // override the auto-detection of datablock encoding.
-115  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
-116      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
-117
-118  /**
-119   * Keep locality while generating HFiles for bulkload. See HBASE-12596
-120   */
-121  public static final String LOCALITY_SENSITIVE_CONF_KEY =
-122      "hbase.bulkload.locality.sensitive.enabled";
-123  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
-124  private static final String OUTPUT_TABLE_NAME_CONF_KEY =
-125      "hbase.mapreduce.hfileoutputformat.table.name";
-126
-127  public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
-128  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
-129
-130  @Override
-131  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
-132      final TaskAttemptContext context) throws IOException, InterruptedException {
-133    return createRecordWriter(context);
-134  }
-135
-136  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
-137  createRecordWriter(final TaskAttemptContext context) throws IOException {
-138    return new HFileRecordWriter<>(context, null);
-139  }
-140
-141  protected static class HFileRecordWriter<V extends Cell>
-142      extends RecordWriter<ImmutableBytesWritable, V> {
-143    private final TaskAttemptContext context;
-144    private final Path outputPath;
-145    private final Path outputDir;
-146    private final Configuration conf;
-147    private final FileSystem fs;
-148
-149    private final long maxsize;
-150
-151    private final Algorithm defaultCompression;
-152    private final boolean compactionExclude;
-153
-154    private final Map<byte[], Algorithm> compressionMap;
-155    private final Map<byte[], BloomType> bloomTypeMap;
-156    private final Map<byte[], Integer> blockSizeMap;
-157
-158    private final Map<byte[], DataBlockEncoding> datablockEncodingMap;
-159    private final DataBlockEncoding overriddenEncoding;
-160
-161    private final Map<byte[], WriterLength> writers;
-162    private byte[] previousRow;
-163    private final byte[] now;
-164    private boolean rollRequested;
-165
-166    /**
-167     * Mapredue job will create a temp path for outputting results. If out != null, it means that
-168     * the caller has set the temp working dir; If out == null, it means we need to set it here.
-169     * Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us
-170     * temp working dir at the table level and HFileOutputFormat2 has to set it here within this
-171     * constructor.
-172     */
-173    public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
-174        throws IOException {
-175      // Get the path of the temporary output file
-176      context = taContext;
-177
-178      if (out == null) {
-179        outputPath = FileOutputFormat.getOutputPath(context);
-180        outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
-181      } else {
-182        outputPath = out;
-183        outputDir = outputPath;
-184      }
-185
-186      conf = context.getConfiguration();
-187      fs = outputDir.getFileSystem(conf);
-188
-189      // These configs. are from hbase-*.xml
-190      maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
-191
-192      // Invented config. Add to hbase-*.xml if other than default compression.
-193      String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
-194      defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
-195      compactionExclude =
-196          conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-197
-198      // create a map from column family to the compression algorithm
-199      compressionMap = createFamilyCompressionMap(conf);
-200      bloomTypeMap = createFamilyBloomTypeMap(conf);
-201      blockSizeMap = createFamilyBlockSizeMap(conf);
-202
-203      // Config for data block encoding
-204      String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
-205      datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
-206      if (dataBlockEncodingStr != null) {
-207        overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
-208      } else {
-209        overriddenEncoding = null;
-210      }
-211
-212      writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-213      previousRow = HConstants.EMPTY_BYTE_ARRAY;
-214      now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
-215      rollRequested = false;
-216    }
-217
-218    @Override
-219    public void write(ImmutableBytesWritable row, V cell) throws IOException {
-220      KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-221
-222      // null input == user explicitly wants to flush
-223      if (row == null && kv == null) {
-224        rollWriters();
-225        return;
-226      }
-227
-228      byte[] rowKey = CellUtil.cloneRow(kv);
-229      long length = kv.getLength();
-230      byte[] family = CellUtil.cloneFamily(kv);
-231      WriterLength wl = this.writers.get(family);
-232
-233      // If this is a new column family, verify that the directory exists
-234      if (wl == null) {
-235        Path cfPath = new Path(outputDir, Bytes.toString(family));
-236        fs.mkdirs(cfPath);
-237        configureStoragePolicy(conf, fs, family, cfPath);
-238      }
-239
-240      // If any of the HFiles for the column families has reached
-241      // maxsize, we need to roll all the writers
-242      if (wl != null && wl.written + length >= maxsize) {
-243        this.rollRequested = true;
-244      }
-245
-246      // This can only happen once a row is finished though
-247      if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
-248        rollWriters();
-249      }
-250
-251      // create a new WAL writer, if necessary
-252      if (wl == null || wl.writer == null) {
-253        if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-254          HRegionLocation loc = null;
-255          String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
-256          if (tableName != null) {
-257            try (Connection connection = ConnectionFactory.createConnection(conf);
-258                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
-259              loc = locator.getRegionLocation(rowKey);
-260            } catch (Throwable e) {
-261              LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
-262                  e);
-263              loc = null;
-264            }
-265          }
-266
-267          if (null == loc) {
-268            if (LOG.isTraceEnabled()) {
-269              LOG.trace(
-270                  "failed to get region location, so use default writer: " + Bytes.toString(rowKey));
-271            }
-272            wl = getNewWriter(family, conf, null);
-273          } else {
-274            if (LOG.isDebugEnabled()) {
-275              LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
-276            }
-277            InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
-278            if (initialIsa.isUnresolved()) {
-279              if (LOG.isTraceEnabled()) {
-280                LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
-281                    + loc.getPort() + ", so use default writer");
-282              }
-283              wl = getNewWriter(family, conf, null);
-284            } else {
-285              if (LOG.isDebugEnabled()) {
-286                LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
-287              }
-288              wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
-289            }
-290          }
-291        } else {
-292          wl = getNewWriter(family, conf, null);
-293        }
-294      }
-295
-296      // we now have the proper WAL writer. full steam ahead
-297      kv.updateLatestStamp(this.now);
-298      wl.writer.append(kv);
-299      wl.written += length;
-300
-301      // Copy the row so we know when a row transition.
-302      this.previousRow = rowKey;
-303    }
-304
-305    private void rollWriters() throws IOException {
-306      for (WriterLength wl : this.writers.values()) {
-307        if (wl.writer != null) {
-308          LOG.info(
-309              "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
-310          close(wl.writer);
-311        }
-312        wl.writer = null;
-313        wl.written = 0;
-314      }
-315      this.rollRequested = false;
-316    }
-317
-318    /*
-319     * Create a new StoreFile.Writer.
-320     * @param family
-321     * @return A WriterLength, containing a new StoreFile.Writer.
-322     * @throws IOException
-323     */
-324    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
-325        justification = "Not important")
-326    private WriterLength getNewWriter(byte[] family, Configuration conf,
-327        InetSocketAddress[] favoredNodes) throws IOException {
-328      WriterLength wl = new WriterLength();
-329      Path familyDir = new Path(outputDir, Bytes.toString(family));
-330      Algorithm compression = compressionMap.get(family);
-331      compression = compression == null ? defaultCompression : compression;
-332      BloomType bloomType = bloomTypeMap.get(family);
-333      bloomType = bloomType == null ? BloomType.NONE : bloomType;
-334      Integer blockSize = blockSizeMap.get(family);
-335      blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
-336      DataBlockEncoding encoding = overriddenEncoding;
-337      encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
-338      encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
-339      Configuration tempConf = new Configuration(conf);
-340      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-341      HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
-342          .withChecksumType(HStore.getChecksumType(conf))
-343          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
-344
-345      if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
-346        contextBuilder.withIncludesTags(true);
-347      }
-348
-349      contextBuilder.withDataBlockEncoding(encoding);
-350      HFileContext hFileContext = contextBuilder.build();
-351
-352      if (null == favoredNodes) {
-353        wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
-354            .withOutputDir(familyDir).withBloomType(bloomType)
-355            .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
-356      } else {
-357        wl.writer =
-358            new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-359                .withOutputDir(familyDir).withBloomType(bloomType)
-360                .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-361                .withFavoredNodes(favoredNodes).build();
-362      }
-363
-364      this.writers.put(family, wl);
-365      return wl;
-366    }
-367
-368    private void close(final StoreFileWriter w) throws IOException {
-369      if (w != null) {
-370        w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
-371        w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-372            Bytes.toBytes(context.getTaskAttemptID().toString()));
-373        w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
-374        w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-375            Bytes.toBytes(compactionExclude));
-376        w.appendTrackedTimestampsToMetadata();
-377        w.close();
-378      }
-379    }
-380
-381    @Override
-382    public void close(TaskAttemptContext c) throws IOException, InterruptedException {
-383      for (WriterLength wl : this.writers.values()) {
-384        close(wl.writer);
-385      }
-386    }
-387  }
-388
-389  /**
-390   * Configure block storage policy for CF after the directory is created.
-391   */
-392  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
-393      byte[] family, Path cfPath) {
-394    if (null == conf || null == fs || null == family || null == cfPath) {
-395      return;
-396    }
-397
-398    String policy =
-399        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
-400          conf.get(STORAGE_POLICY_PROPERTY));
-401    FSUtils.setStoragePolicy(fs, cfPath, policy);
-402  }
-403
-404  /*
-405   * Data structure to hold a Writer and amount of data written on it.
-406   */
-407  static class WriterLength {
-408    long written = 0;
-409    StoreFileWriter writer = null;
-410  }
-411
-412  /**
-413   * Return the start keys of all of the regions in this table,
-414   * as a list of ImmutableBytesWritable.
-415   */
-416  private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
-417  throws IOException {
-418    byte[][] byteKeys = table.getStartKeys();
-419    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(byteKeys.length);
-420    for (byte[] byteKey : byteKeys) {
-421      ret.add(new ImmutableBytesWritable(byteKey));
-422    }
-423    return ret;
-424  }
-425
-426  /**
-427   * Write out a {@link SequenceFile} that can be read by
-428   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
-429   */
-430  @SuppressWarnings("deprecation")
-431  private static void writePartitions(Configuration conf, Path partitionsPath,
-432      List<ImmutableBytesWritable> startKeys) throws IOException {
-433    LOG.info("Writing partition information to " + partitionsPath);
-434    if (startKeys.isEmpty()) {
-435      throw new IllegalArgumentException("No regions passed");
-436    }
-437
-438    // We're generating a list of split points, and we don't ever
-439    // have keys < the first region (which has an empty start key)
-440    // so we need to remove it. Otherwise we would end up with an
-441    // empty reducer with index 0
-442    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
-443
-444    ImmutableBytesWritable first = sorted.first();
-445    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
-446      throw new IllegalArgumentException(
-447          "First region of table should have empty start key. Instead has: "
-448          + Bytes.toStringBinary(first.get()));
-449    }
-450    sorted.remove(first);
-451
-452    // Write the actual file
-453    FileSystem fs = partitionsPath.getFileSystem(conf);
-454    SequenceFile.Writer writer = SequenceFile.createWriter(
-455      fs, conf, partitionsPath, ImmutableBytesWritable.class,
-456      NullWritable.class);
-457
-458    try {
-459      for (ImmutableBytesWritable startKey : sorted) {
-460        writer.append(startKey, NullWritable.get());
-461      }
-462    } finally {
-463      writer.close();
-464    }
-465  }
-466
-467  /**
-468   * Configure a MapReduce Job to perform an incremental load into the given
-469   * table. This
-470   * <ul>
-471   *   <li>Inspects the table to configure a total order partitioner</li>
-472   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
-473   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
-474   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
-475   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
-476   *     PutSortReducer)</li>
-477   * </ul>
-478   * The user should be sure to set the map output value class to either KeyValue or Put before
-479   * running this function.
-480   */
-481  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
-482      throws IOException {
-483    configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
-484  }
-485
-486  /**
-487   * Configure a MapReduce Job to perform an incremental load into the given
-488   * table. This
-489   * <ul>
-490   *   <li>Inspects the table to configure a total order partitioner</li>
-491   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
-492   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
-493   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
-494   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
-495   *     PutSortReducer)</li>
-496   * </ul>
-497   * The user should be sure to set the map output value class to either KeyValue or Put before
-498   * running this function.
-499   */
-500  public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
-501      RegionLocator regionLocator) throws IOException {
-502    configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
-503  }
-504
-505  static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
-506      RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
-507      UnsupportedEncodingException {
-508    Configuration conf = job.getConfiguration();
-509    job.setOutputKeyClass(ImmutableBytesWritable.class);
-510    job.setOutputValueClass(KeyValue.class);
-511    job.setOutputFormatClass(cls);
-512
-513    // Based on the configured map output class, set the correct reducer to properly
-514    // sort the incoming values.
-515    // TODO it would be nice to pick one or the other of these formats.
-516    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-517      job.setReducerClass(KeyValueSortReducer.class);
-518    } else if (Put.class.equals(job.getMapOutputValueClass())) {
-519      job.setReducerClass(PutSortReducer.class);
-520    } else if (Text.class.equals(job.getMapOutputValueClass())) {
-521      job.setReducerClass(TextSortReducer.class);
-522    } else {
-523      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
-524    }
-525
-526    conf.setStrings("io.serializations", conf.get("io.serializations"),
-527        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-528        KeyValueSerialization.class.getName());
-529
-530    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-531      // record this table name for creating writer by favored nodes
-532      LOG.info("bulkload locality sensitive enabled");
-533      conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
-534    }
-535
-536    // Use table's region boundaries for TOP split points.
-537    LOG.info("Looking up current regions for table " + regionLocator.getName());
-538    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
-539    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
-540        "to match current region count");
-541    job.setNumReduceTasks(startKeys.size());
-542
-543    configurePartitioner(job, startKeys);
-544    // Set compression algorithms based on column families
-545    configureCompression(conf, tableDescriptor);
-546    configureBloomType(tableDescriptor, conf);
-547    configureBlockSize(tableDescriptor, conf);
-548    configureDataBlockEncoding(tableDescriptor, conf);
-549
-550    TableMapReduceUtil.addDependencyJars(job);
-551    TableMapReduceUtil.initCredentials(job);
-552    LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
-553  }
-554
-555  public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
-556      IOException {
-557    Configuration conf = job.getConfiguration();
-558
-559    job.setOutputKeyClass(ImmutableBytesWritable.class);
-560    job.setOutputValueClass(KeyValue.class);
-561    job.setOutputFormatClass(HFileOutputFormat2.class);
-562
-563    // Set compression algorithms based on column families
-564    configureCompression(conf, tableDescriptor);
-565    configureBloomType(tableDescriptor, conf);
-566    configureBlockSize(tableDescriptor, conf);
-567    configureDataBlockEncoding(tableDescriptor, conf);
-568
-569    TableMapReduceUtil.addDependencyJars(job);
-570    TableMapReduceUtil.initCredentials(job);
-571    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
-572  }
-573
-574  /**
-575   * Runs inside the task to deserialize column family to compression algorithm
-576   * map from the configuration.
-577   *
-578   * @param conf to read the serialized values from
-579   * @return a map from column family to the configured compression algorithm
-580   */
-581  @VisibleForTesting
-582  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
-583      conf) {
-584    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-585        COMPRESSION_FAMILIES_CONF_KEY);
-586    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-587    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-588      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
-589      compressionMap.put(e.getKey(), algorithm);
-590    }
-591    return compressionMap;
-592  }
-593
-594  /**
-595   * Runs inside the task to deserialize column family to bloom filter type
-596   * map from the configuration.
-597   *
-598   * @param conf to read the serialized values from
-599   * @return a map from column family to the the configured bloom filter type
-600   */
-601  @VisibleForTesting
-602  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
-603    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-604        BLOOM_TYPE_FAMILIES_CONF_KEY);
-605    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-606    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-607      BloomType bloomType = BloomType.valueOf(e.getValue());
-608      bloomTypeMap.put(e.getKey(), bloomType);
-609    }
-610    return bloomTypeMap;
-611  }
-612
-613  /**
-614   * Runs inside the task to deserialize column family to block size
-615   * map from the configuration.
-616   *
-617   * @param conf to read the serialized values from
-618   * @return a map from column family to the configured block size
-619   */
-620  @VisibleForTesting
-621  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
-622    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-623        BLOCK_SIZE_FAMILIES_CONF_KEY);
-624    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-625    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-626      Integer blockSize = Integer.parseInt(e.getValue());
-627      blockSizeMap.put(e.getKey(), blockSize);
-628    }
-629    return blockSizeMap;
-630  }
-631
-632  /**
-633   * Runs inside the task to deserialize column family to data block encoding
-634   * type map from the configuration.
-635   *
-636   * @param conf to read the serialized values from
-637   * @return a map from column family to HFileDataBlockEncoder for the
-638   *         configured data block type for the family
-639   */
-640  @VisibleForTesting
-641  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
-642      Configuration conf) {
-643    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
-644        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
-645    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-646    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-647      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
-648    }
-649    return encoderMap;
-650  }
-651
-652
-653  /**
-654   * Run inside the task to deserialize column family to given conf value map.
-655   *
-656   * @param conf to read the serialized values from
-657   * @param confName conf key to read from the configuration
-658   * @return a map of column family to the given configuration value
-659   */
-660  private static Map<byte[], String> createFamilyConfValueMap(
-661      Configuration conf, String confName) {
-662    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-663    String confVal = conf.get(confName, "");
-664    for (String familyConf : confVal.split("&")) {
-665      String[] familySplit = familyConf.split("=");
-666      if (familySplit.length != 2) {
-667        continue;
-668      }
-669      try {
-670        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
-671            URLDecoder.decode(familySplit[1], "UTF-8"));
-672      } catch (UnsupportedEncodingException e) {
-673        // will not happen with UTF-8 encoding
-674        throw new AssertionError(e);
-675      }
-676    }
-677    return confValMap;
-678  }
-679
-680  /**
-681   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
-682   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
-683   */
-684  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
-685      throws IOException {
-686    Configuration conf = job.getConfiguration();
-687    // create the partitions file
-688    FileSystem fs = FileSystem.get(conf);
-689    String hbaseTmpFsDir =
-690        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-691          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-692    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
-693    fs.makeQualified(partitionsPath);
-694    writePartitions(conf, partitionsPath, splitPoints);
-695    fs.deleteOnExit(partitionsPath);
-696
-697    // configure job to use it
-698    job.setPartitionerClass(TotalOrderPartitioner.class);
-699    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
-700  }
-701
-702  /**
-703   * Serialize column family to compression algorithm map to configuration.
-704   * Invoked while configuring the MR job for incremental load.
-705   *
-706   * @param tableDescriptor to read the properties from
-707   * @param conf to persist serialized values into
-708   * @throws IOException
-709   *           on failure to read column family descriptors
-710   */
-711  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-712      value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
-713  @VisibleForTesting
-714  static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
-715      throws UnsupportedEncodingException {
-716    StringBuilder compressionConfigValue = new StringBuilder();
-717    if(tableDescriptor == null){
-718      // could happen with mock table instance
-719      return;
-720    }
-721    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-722    int i = 0;
-723    for (HColumnDescriptor familyDescriptor : families) {
-724      if (i++ > 0) {
-725        compressionConfigValue.append('&');
-726      }
-727      compressionConfigValue.append(URLEncoder.encode(
-728        familyDescriptor.getNameAsString(), "UTF-8"));
-729      compressionConfigValue.append('=');
-730      compressionConfigValue.append(URLEncoder.encode(
-731        familyDescriptor.getCompressionType().getName(), "UTF-8"));
-732    }
-733    // Get rid of the last ampersand
-734    conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
-735  }
-736
-737  /**
-738   * Serialize column family to block size map to configuration.
-739   * Invoked while configuring the MR job for incremental load.
-740   * @param tableDescriptor to read the properties from
-741   * @param conf to persist serialized values into
-742   *
-743   * @throws IOException
-744   *           on failure to read column family descriptors
-745   */
-746  @VisibleForTesting
-747  static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
-748      throws UnsupportedEncodingException {
-749    StringBuilder blockSizeConfigValue = new StringBuilder();
-750    if (tableDescriptor == null) {
-751      // could happen with mock table instance
-752      return;
-753    }
-754    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-755    int i = 0;
-756    for (HColumnDescriptor familyDescriptor : families) {
-757      if (i++ > 0) {
-758        blockSizeConfigValue.append('&');
-759      }
-760      blockSizeConfigValue.append(URLEncoder.encode(
-761          familyDescriptor.getNameAsString(), "UTF-8"));
-762      blockSizeConfigValue.append('=');
-763      blockSizeConfigValue.append(URLEncoder.encode(
-764          String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
-765    }
-766    // Get rid of the last ampersand
-767    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
-768  }
-769
-770  /**
-771   * Serialize column family to bloom type map to configuration.
-772   * Invoked while configuring the MR job for incremental load.
-773   * @param tableDescriptor to read the properties from
-774   * @param conf to persist serialized values into
-775   *
-776   * @throws IOException
-777   *           on failure to read column family descriptors
-778   */
-779  @VisibleForTesting
-780  static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
-781      throws UnsupportedEncodingException {
-782    if (tableDescriptor == null) {
-783      // could happen with mock table instance
-784      return;
-785    }
-786    StringBuilder bloomTypeConfigValue = new StringBuilder();
-787    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-788    int i = 0;
-789    for (HColumnDescriptor familyDescriptor : families) {
-790      if (i++ > 0) {
-791        bloomTypeConfigValue.append('&');
-792      }
-793      bloomTypeConfigValue.append(URLEncoder.encode(
-794        familyDescriptor.getNameAsString(), "UTF-8"));
-795      bloomTypeConfigValue.append('=');
-796      String bloomType = familyDescriptor.getBloomFilterType().toString();
-797      if (bloomType == null) {
-798        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
-799      }
-800      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
-801    }
-802    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
-803  }
-804
-805  /**
-806   * Serialize column family to data block encoding map to configuration.
-807   * Invoked while configuring the MR job for incremental load.
-808   *
-809   * @param tableDescriptor to read the properties from
-810   * @param conf to persist serialized values into
-811   * @throws IOException
-812   *           on failure to read column family descriptors
-813   */
-814  @VisibleForTesting
-815  static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
-816      Configuration conf) throws UnsupportedEncodingException {
-817    if (tableDescriptor == null) {
-818      // could happen with mock table instance
-819      return;
-820    }
-821    StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
-822    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-823    int i = 0;
-824    for (HColumnDescriptor familyDescriptor : families) {
-825      if (i++ > 0) {
-826        dataBlockEncodingConfigValue.append('&');
-827      }
-828      dataBlockEncodingConfigValue.append(
-829          URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-830      dataBlockEncodingConfigValue.append('=');
-831      DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
-832      if (encoding == null) {
-833        encoding = DataBlockEncoding.NONE;
-834      }
-835      dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
-836          "UTF-8"));
-837    }
-838    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
-839        dataBlockEncodingConfigValue.toString());
-840  }
-841}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -