Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7F17310862 for ; Fri, 2 Aug 2013 15:21:02 +0000 (UTC) Received: (qmail 94291 invoked by uid 500); 2 Aug 2013 15:21:01 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 94205 invoked by uid 500); 2 Aug 2013 15:21:00 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 94164 invoked by uid 99); 2 Aug 2013 15:20:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Aug 2013 15:20:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B9A818B74F6; Fri, 2 Aug 2013 15:20:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chaoshi@apache.org To: commits@crunch.apache.org Date: Fri, 02 Aug 2013 15:20:57 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] CRUNCH-212: Target wrapper for HFileOuptutFormat Updated Branches: refs/heads/master d4a06967e -> 92ea0592f http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java new file mode 100644 index 0000000..311d91c --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -0,0 +1,133 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; +import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +/** + * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append(byte[], byte[])} + * when records are emitted. It only supports writing data into a single column family. Records MUST be sorted + * by their column qualifier, then timestamp reversely. All data are written into a single HFile. + * + * HBase's official {@code HFileOutputFormat} is not used, because it shuffles on row-key only and + * does in-memory sort at reducer side (so the size of output HFile is limited to reducer's memory). + * As crunch supports more complex and flexible MapReduce pipeline, we would prefer thin and pure + * {@code OutputFormat} here. + */ +public class HFileOutputFormatForCrunch extends FileOutputFormat { + + private static final String COMPACTION_EXCLUDE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.compaction.exclude"; + private static final String DATABLOCK_ENCODING_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + private static final String BLOCK_SIZE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.blocksize"; + private static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; + private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); + private final TimeRangeTracker trt = new TimeRangeTracker(); + + public RecordWriter getRecordWriter(final TaskAttemptContext context) + throws IOException, InterruptedException { + Path outputPath = getDefaultWorkFile(context, ""); + Configuration conf = context.getConfiguration(); + FileSystem fs = outputPath.getFileSystem(conf); + int blocksize = conf.getInt(BLOCK_SIZE_CONF_KEY, + HFile.DEFAULT_BLOCKSIZE); + String compression = conf.get( + COMPRESSION_CONF_KEY, Compression.Algorithm.NONE.getName()); + final boolean compactionExclude = conf.getBoolean( + COMPACTION_EXCLUDE_CONF_KEY, false); + HFileDataBlockEncoder encoder = getDataBlockEncoder( + conf.get(DATABLOCK_ENCODING_CONF_KEY)); + final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf) + .withPath(fs, outputPath) + .withBlockSize(blocksize) + .withCompression(compression) + .withComparator(KeyValue.KEY_COMPARATOR) + .withDataBlockEncoder(encoder) + .withChecksumType(Store.getChecksumType(conf)) + .withBytesPerChecksum(Store.getBytesPerChecksum(conf)) + .create(); + + return new RecordWriter() { + public void write(Object row, KeyValue kv) + throws IOException { + if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) { + kv.updateLatestStamp(now); + } + writer.append(kv); + trt.includeTimestamp(kv); + } + + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + writer.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + writer.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + writer.appendFileInfo(StoreFile.TIMERANGE_KEY, + WritableUtils.toByteArray(trt)); + writer.close(); + } + }; + } + + private HFileDataBlockEncoder getDataBlockEncoder(String dataBlockEncodingStr) { + final HFileDataBlockEncoder encoder; + if (dataBlockEncodingStr == null) { + encoder = NoOpDataBlockEncoder.INSTANCE; + } else { + try { + encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding + .valueOf(dataBlockEncodingStr)); + } catch (IllegalArgumentException ex) { + throw new RuntimeException( + "Invalid data block encoding type configured for the param " + + DATABLOCK_ENCODING_CONF_KEY + " : " + + dataBlockEncodingStr); + } + } + return encoder; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java new file mode 100644 index 0000000..0038394 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.hadoop.fs.Path; + +public class HFileTarget extends FileTargetImpl { + + // TODO(chaoshi): configurable compression algorithm, block size, data block encoder for hfile... + + public HFileTarget(String path) { + this(new Path(path)); + } + + public HFileTarget(Path path) { + super(path, HFileOutputFormatForCrunch.class, new SequentialFileNamingScheme()); + } + + @Override + public String toString() { + return "HFile(" + path + ")"; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java new file mode 100644 index 0000000..2235538 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; +import org.apache.crunch.FilterFn; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.sort.TotalOrderPartitioner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.SequenceFile; + +import java.io.IOException; +import java.util.List; + +import static org.apache.crunch.types.writable.Writables.nulls; +import static org.apache.crunch.types.writable.Writables.tableOf; +import static org.apache.crunch.types.writable.Writables.writables; + +public final class HFileUtils { + + private static final Log LOG = LogFactory.getLog(HFileUtils.class); + + private static class FilterByFamilyFn extends FilterFn { + + private final byte[] family; + + private FilterByFamilyFn(byte[] family) { + this.family = family; + } + + @Override + public boolean accept(KeyValue input) { + return Bytes.equals( + input.getBuffer(), input.getFamilyOffset(), input.getFamilyLength(), + family, 0, family.length); + } + } + + private static class KeyValueComparator implements RawComparator { + + @Override + public int compare(byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength) { + // BytesWritable serialize length in first 4 bytes. + // We simply ignore it here, because KeyValue has its own size serialized. + if (llength < 4) { + throw new AssertionError("Too small llength: " + llength); + } + if (rlength < 4) { + throw new AssertionError("Too small rlength: " + rlength); + } + KeyValue leftKey = new KeyValue(left, loffset + 4, llength - 4); + KeyValue rightKey = new KeyValue(right, roffset + 4, rlength - 4); + return compare(leftKey, rightKey); + } + + @Override + public int compare(KeyValue left, KeyValue right) { + return KeyValue.COMPARATOR.compare(left, right); + } + } + + private HFileUtils() {} + + public static void writeToHFilesForIncrementalLoad( + PCollection kvs, + HTable table, + Path outputPath) throws IOException { + HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); + if (families.length == 0) { + LOG.warn(table + "has no column families"); + return; + } + for (HColumnDescriptor f : families) { + byte[] family = f.getName(); + PCollection sorted = sortAndPartition( + kvs.filter(new FilterByFamilyFn(family)), table); + sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)))); + } + } + + public static PCollection sortAndPartition(PCollection kvs, HTable table) throws IOException { + Configuration conf = kvs.getPipeline().getConfiguration(); + PTable t = kvs.parallelDo(new MapFn>() { + @Override + public Pair map(KeyValue input) { + return Pair.of(input, (Void) null); + } + }, tableOf(writables(KeyValue.class), nulls())); + List splitPoints = getSplitPoints(table); + Path partitionFile = new Path(((MRPipeline) kvs.getPipeline()).createTempPath(), "partition"); + writePartitionInfo(conf, partitionFile, splitPoints); + TotalOrderPartitioner.setPartitionFile(conf, partitionFile); + GroupingOptions options = GroupingOptions.builder() + .partitionerClass(TotalOrderPartitioner.class) + .numReducers(splitPoints.size() + 1) + .sortComparatorClass(KeyValueComparator.class) + .build(); + return t.groupByKey(options).ungroup().keys(); + } + + private static List getSplitPoints(HTable table) throws IOException { + List startKeys = ImmutableList.copyOf(table.getStartKeys()); + if (startKeys.isEmpty()) { + throw new AssertionError(table + " has no regions!"); + } + List splitPoints = Lists.newArrayList(); + for (byte[] startKey : startKeys.subList(1, startKeys.size())) { + KeyValue kv = KeyValue.createFirstOnRow(startKey); + LOG.debug("split row: " + Bytes.toString(kv.getRow())); + splitPoints.add(kv); + } + return splitPoints; + } + + private static void writePartitionInfo( + Configuration conf, + Path path, + List splitPoints) throws IOException { + LOG.info("Writing " + splitPoints.size() + " split points to " + path); + SequenceFile.Writer writer = SequenceFile.createWriter( + path.getFileSystem(conf), + conf, + path, + NullWritable.class, + KeyValue.class); + for (KeyValue key : splitPoints) { + writer.append(NullWritable.get(), writables(KeyValue.class).getOutputMapFn().map(key)); + } + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java index fa6b1a3..2c53ae1 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java @@ -18,6 +18,7 @@ package org.apache.crunch.io.hbase; import org.apache.crunch.Target; +import org.apache.hadoop.fs.Path; /** * Static factory methods for creating HBase {@link Target} types. @@ -28,4 +29,11 @@ public class ToHBase { return new HBaseTarget(table); } + public static Target hfile(String path) { + return new HFileTarget(path); + } + + public static Target hfile(Path path) { + return new HFileTarget(path); + } }