crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chao...@apache.org
Subject [1/3] CRUNCH-212: Target wrapper for HFileOuptutFormat
Date Fri, 02 Aug 2013 15:20:57 GMT
Updated Branches:
  refs/heads/master d4a06967e -> 92ea0592f


http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
new file mode 100644
index 0000000..311d91c
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -0,0 +1,133 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append(byte[],
byte[])}
+ * when records are emitted. It only supports writing data into a single column family. Records
MUST be sorted
+ * by their column qualifier, then timestamp reversely. All data are written into a single
HFile.
+ *
+ * HBase's official {@code HFileOutputFormat} is not used, because it shuffles on row-key
only and
+ * does in-memory sort at reducer side (so the size of output HFile is limited to reducer's
memory).
+ * As crunch supports more complex and flexible MapReduce pipeline, we would prefer thin
and pure
+ * {@code OutputFormat} here.
+ */
+public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValue>
{
+
+  private static final String COMPACTION_EXCLUDE_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.compaction.exclude";
+  private static final String DATABLOCK_ENCODING_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+  private static final String BLOCK_SIZE_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.blocksize";
+  private static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
+  private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+  private final TimeRangeTracker trt = new TimeRangeTracker();
+
+  public RecordWriter<Object, KeyValue> getRecordWriter(final TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    Path outputPath = getDefaultWorkFile(context, "");
+    Configuration conf = context.getConfiguration();
+    FileSystem fs = outputPath.getFileSystem(conf);
+    int blocksize = conf.getInt(BLOCK_SIZE_CONF_KEY,
+        HFile.DEFAULT_BLOCKSIZE);
+    String compression = conf.get(
+        COMPRESSION_CONF_KEY, Compression.Algorithm.NONE.getName());
+    final boolean compactionExclude = conf.getBoolean(
+        COMPACTION_EXCLUDE_CONF_KEY, false);
+    HFileDataBlockEncoder encoder = getDataBlockEncoder(
+        conf.get(DATABLOCK_ENCODING_CONF_KEY));
+    final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
+        .withPath(fs, outputPath)
+        .withBlockSize(blocksize)
+        .withCompression(compression)
+        .withComparator(KeyValue.KEY_COMPARATOR)
+        .withDataBlockEncoder(encoder)
+        .withChecksumType(Store.getChecksumType(conf))
+        .withBytesPerChecksum(Store.getBytesPerChecksum(conf))
+        .create();
+
+    return new RecordWriter<Object, KeyValue>() {
+      public void write(Object row, KeyValue kv)
+          throws IOException {
+        if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+          kv.updateLatestStamp(now);
+        }
+        writer.append(kv);
+        trt.includeTimestamp(kv);
+      }
+
+      public void close(TaskAttemptContext c)
+          throws IOException, InterruptedException {
+        writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+            Bytes.toBytes(System.currentTimeMillis()));
+        writer.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+            Bytes.toBytes(context.getTaskAttemptID().toString()));
+        writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+            Bytes.toBytes(true));
+        writer.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+            Bytes.toBytes(compactionExclude));
+        writer.appendFileInfo(StoreFile.TIMERANGE_KEY,
+            WritableUtils.toByteArray(trt));
+        writer.close();
+      }
+    };
+  }
+
+  private HFileDataBlockEncoder getDataBlockEncoder(String dataBlockEncodingStr) {
+    final HFileDataBlockEncoder encoder;
+    if (dataBlockEncodingStr == null) {
+      encoder = NoOpDataBlockEncoder.INSTANCE;
+    } else {
+      try {
+        encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding
+            .valueOf(dataBlockEncodingStr));
+      } catch (IllegalArgumentException ex) {
+        throw new RuntimeException(
+            "Invalid data block encoding type configured for the param "
+                + DATABLOCK_ENCODING_CONF_KEY + " : "
+                + dataBlockEncodingStr);
+      }
+    }
+    return encoder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
new file mode 100644
index 0000000..0038394
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.hadoop.fs.Path;
+
+public class HFileTarget extends FileTargetImpl {
+
+  // TODO(chaoshi): configurable compression algorithm, block size, data block encoder for
hfile...
+
+  public HFileTarget(String path) {
+    this(new Path(path));
+  }
+
+  public HFileTarget(Path path) {
+    super(path, HFileOutputFormatForCrunch.class, new SequentialFileNamingScheme());
+  }
+
+  @Override
+  public String toString() {
+    return "HFile(" + path + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
new file mode 100644
index 0000000..2235538
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.sort.TotalOrderPartitioner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.crunch.types.writable.Writables.nulls;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.apache.crunch.types.writable.Writables.writables;
+
+public final class HFileUtils {
+
+  private static final Log LOG = LogFactory.getLog(HFileUtils.class);
+
+  private static class FilterByFamilyFn extends FilterFn<KeyValue> {
+
+    private final byte[] family;
+
+    private FilterByFamilyFn(byte[] family) {
+      this.family = family;
+    }
+
+    @Override
+    public boolean accept(KeyValue input) {
+      return Bytes.equals(
+          input.getBuffer(), input.getFamilyOffset(), input.getFamilyLength(),
+          family, 0, family.length);
+    }
+  }
+
+  private static class KeyValueComparator implements RawComparator<KeyValue> {
+
+    @Override
+    public int compare(byte[] left, int loffset, int llength, byte[] right, int roffset,
int rlength) {
+      // BytesWritable serialize length in first 4 bytes.
+      // We simply ignore it here, because KeyValue has its own size serialized.
+      if (llength < 4) {
+        throw new AssertionError("Too small llength: " + llength);
+      }
+      if (rlength < 4) {
+        throw new AssertionError("Too small rlength: " + rlength);
+      }
+      KeyValue leftKey = new KeyValue(left, loffset + 4, llength - 4);
+      KeyValue rightKey = new KeyValue(right, roffset + 4, rlength - 4);
+      return compare(leftKey, rightKey);
+    }
+
+    @Override
+    public int compare(KeyValue left, KeyValue right) {
+      return KeyValue.COMPARATOR.compare(left, right);
+    }
+  }
+
+  private HFileUtils() {}
+
+  public static void writeToHFilesForIncrementalLoad(
+      PCollection<KeyValue> kvs,
+      HTable table,
+      Path outputPath) throws IOException {
+    HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
+    if (families.length == 0) {
+      LOG.warn(table + "has no column families");
+      return;
+    }
+    for (HColumnDescriptor f : families) {
+      byte[] family = f.getName();
+      PCollection<KeyValue> sorted = sortAndPartition(
+          kvs.filter(new FilterByFamilyFn(family)), table);
+      sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family))));
+    }
+  }
+
+  public static PCollection<KeyValue> sortAndPartition(PCollection<KeyValue>
kvs, HTable table) throws IOException {
+    Configuration conf = kvs.getPipeline().getConfiguration();
+    PTable<KeyValue, Void> t = kvs.parallelDo(new MapFn<KeyValue, Pair<KeyValue,
Void>>() {
+      @Override
+      public Pair<KeyValue, Void> map(KeyValue input) {
+        return Pair.of(input, (Void) null);
+      }
+    }, tableOf(writables(KeyValue.class), nulls()));
+    List <KeyValue> splitPoints = getSplitPoints(table);
+    Path partitionFile = new Path(((MRPipeline) kvs.getPipeline()).createTempPath(), "partition");
+    writePartitionInfo(conf, partitionFile, splitPoints);
+    TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
+    GroupingOptions options = GroupingOptions.builder()
+        .partitionerClass(TotalOrderPartitioner.class)
+        .numReducers(splitPoints.size() + 1)
+        .sortComparatorClass(KeyValueComparator.class)
+        .build();
+    return t.groupByKey(options).ungroup().keys();
+  }
+
+  private static List<KeyValue> getSplitPoints(HTable table) throws IOException {
+    List<byte[]> startKeys = ImmutableList.copyOf(table.getStartKeys());
+    if (startKeys.isEmpty()) {
+      throw new AssertionError(table + " has no regions!");
+    }
+    List<KeyValue> splitPoints = Lists.newArrayList();
+    for (byte[] startKey : startKeys.subList(1, startKeys.size())) {
+      KeyValue kv = KeyValue.createFirstOnRow(startKey);
+      LOG.debug("split row: " + Bytes.toString(kv.getRow()));
+      splitPoints.add(kv);
+    }
+    return splitPoints;
+  }
+
+  private static void writePartitionInfo(
+      Configuration conf,
+      Path path,
+      List<KeyValue> splitPoints) throws IOException {
+    LOG.info("Writing " + splitPoints.size() + " split points to " + path);
+    SequenceFile.Writer writer = SequenceFile.createWriter(
+        path.getFileSystem(conf),
+        conf,
+        path,
+        NullWritable.class,
+        KeyValue.class);
+    for (KeyValue key : splitPoints) {
+      writer.append(NullWritable.get(), writables(KeyValue.class).getOutputMapFn().map(key));
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
index fa6b1a3..2c53ae1 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.io.hbase;
 
 import org.apache.crunch.Target;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Static factory methods for creating HBase {@link Target} types.
@@ -28,4 +29,11 @@ public class ToHBase {
     return new HBaseTarget(table);
   }
 
+  public static Target hfile(String path) {
+    return new HFileTarget(path);
+  }
+
+  public static Target hfile(Path path) {
+    return new HFileTarget(path);
+  }
 }


Mime
View raw message