crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chao...@apache.org
Subject git commit: CRUNCH-246: HFileSource and related utilities (Thanks Ryan Brush for contributing HFileInputFormat)
Date Mon, 02 Sep 2013 06:04:47 GMT
Updated Branches:
  refs/heads/master d91390d6c -> 71e678300


CRUNCH-246: HFileSource and related utilities (Thanks Ryan Brush for contributing HFileInputFormat)


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/71e67830
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/71e67830
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/71e67830

Branch: refs/heads/master
Commit: 71e67830076e328c81ec84ba745f0b8466752909
Parents: d91390d
Author: Chao Shi <chaoshi@apache.org>
Authored: Mon Sep 2 12:22:00 2013 +0800
Committer: Chao Shi <chaoshi@apache.org>
Committed: Mon Sep 2 12:22:00 2013 +0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/HFileSourceIT.java   | 229 ++++++++++++++
 .../org/apache/crunch/io/hbase/FromHBase.java   |   9 +
 .../crunch/io/hbase/HFileInputFormat.java       | 220 ++++++++++++++
 .../org/apache/crunch/io/hbase/HFileSource.java |  76 +++++
 .../org/apache/crunch/io/hbase/HFileUtils.java  | 302 ++++++++++++++++++-
 5 files changed, 835 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
new file mode 100644
index 0000000..61e7663
--- /dev/null
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class HFileSourceIT implements Serializable {
+
+  private static byte[] ROW1 = Bytes.toBytes("row1");
+  private static byte[] ROW2 = Bytes.toBytes("row2");
+  private static byte[] ROW3 = Bytes.toBytes("row3");
+  private static byte[] FAMILY1 = Bytes.toBytes("family1");
+  private static byte[] FAMILY2 = Bytes.toBytes("family2");
+  private static byte[] FAMILY3 = Bytes.toBytes("family3");
+  private static byte[] QUALIFIER1 = Bytes.toBytes("qualifier1");
+  private static byte[] QUALIFIER2 = Bytes.toBytes("qualifier2");
+  private static byte[] QUALIFIER3 = Bytes.toBytes("qualifier3");
+  private static byte[] QUALIFIER4 = Bytes.toBytes("qualifier4");
+  private static byte[] VALUE1 = Bytes.toBytes("value1");
+  private static byte[] VALUE2 = Bytes.toBytes("value2");
+  private static byte[] VALUE3 = Bytes.toBytes("value3");
+  private static byte[] VALUE4 = Bytes.toBytes("value4");
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+  private transient Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = tmpDir.getDefaultConfiguration();
+  }
+
+  @Test
+  public void testHFileSource() throws IOException {
+    List<KeyValue> kvs = generateKeyValues(100);
+    Path inputPath = tmpDir.getPath("in");
+    Path outputPath = tmpDir.getPath("out");
+    writeKeyValuesToHFile(inputPath, kvs);
+
+    Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
+    PCollection<KeyValue> in = pipeline.read(FromHBase.hfile(inputPath));
+    PCollection<String> texts = in.parallelDo(new MapFn<KeyValue, String>() {
+      @Override
+      public String map(KeyValue input) {
+        return input.toString();
+      }
+    }, strings());
+    texts.write(To.textFile(outputPath));
+    PipelineResult result = pipeline.run();
+    assertTrue(result.succeeded());
+
+    List<String> lines = FileUtils.readLines(new File(outputPath.toString(), "part-m-00000"));
+    assertEquals(kvs.size(), lines.size());
+    for (int i = 0; i < kvs.size(); i++) {
+      assertEquals(kvs.get(i).toString(), lines.get(i));
+    }
+  }
+
+  @Test
+  public void testScanHFiles() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER2, 0, VALUE2));
+    List<Result> results = doTestScanHFiles(kvs, new Scan());
+    assertEquals(1, results.size());
+    Result result = Iterables.getOnlyElement(results);
+    assertArrayEquals(ROW1, result.getRow());
+    assertEquals(2, result.raw().length);
+    assertArrayEquals(VALUE1, result.getColumnLatest(FAMILY1, QUALIFIER1).getValue());
+    assertArrayEquals(VALUE2, result.getColumnLatest(FAMILY1, QUALIFIER2).getValue());
+  }
+
+  @Test
+  public void testScanHFiles_maxVersions() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 3, VALUE3),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2));
+    Scan scan = new Scan();
+    scan.setMaxVersions(2);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(1, results.size());
+    Result result = Iterables.getOnlyElement(results);
+    List<KeyValue> kvs2 = result.getColumn(FAMILY1, QUALIFIER1);
+    assertEquals(3, kvs2.size());
+    assertArrayEquals(VALUE3, kvs2.get(0).getValue());
+    assertArrayEquals(VALUE2, kvs2.get(1).getValue());
+    assertArrayEquals(VALUE1, kvs2.get(2).getValue());
+  }
+
+  @Test
+  public void testScanHFiles_startStopRows() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW3, FAMILY1, QUALIFIER1, 0, VALUE1));
+    Scan scan = new Scan();
+    scan.setStartRow(ROW2);
+    scan.setStopRow(ROW3);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(1, results.size());
+    Result result = Iterables.getOnlyElement(results);
+    assertArrayEquals(ROW2, result.getRow());
+  }
+
+  @Test
+  public void testScanHFiles_familyMap() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
+        new KeyValue(ROW1, FAMILY2, QUALIFIER2, 0, VALUE2),
+        new KeyValue(ROW1, FAMILY2, QUALIFIER3, 0, VALUE3),
+        new KeyValue(ROW1, FAMILY3, QUALIFIER4, 0, VALUE4));
+    Scan scan = new Scan();
+    scan.addFamily(FAMILY1);
+    scan.addColumn(FAMILY2, QUALIFIER2);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(1, results.size());
+    Result result = Iterables.getOnlyElement(results);
+    assertEquals(2, result.size());
+    assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER1));
+    assertNotNull(result.getColumnLatest(FAMILY2, QUALIFIER2));
+  }
+
+  @Test
+  public void testScanHFiles_timeRange() throws IOException {
+    List<KeyValue> kvs = ImmutableList.of(
+        new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER2, 2, VALUE2),
+        new KeyValue(ROW1, FAMILY1, QUALIFIER2, 3, VALUE3));
+    Scan scan = new Scan();
+    scan.setTimeRange(2, 3);
+    List<Result> results = doTestScanHFiles(kvs, scan);
+    assertEquals(1, results.size());
+    Result result = Iterables.getOnlyElement(results);
+    assertEquals(1, result.size());
+    assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2));
+  }
+
+  private List<Result> doTestScanHFiles(List<KeyValue> kvs, Scan scan) throws
IOException {
+    Path inputPath = tmpDir.getPath("in");
+    writeKeyValuesToHFile(inputPath, kvs);
+
+    Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
+    PCollection<Result> results = HFileUtils.scanHFiles(pipeline, inputPath, scan);
+    return ImmutableList.copyOf(results.materialize());
+  }
+
+  private List<KeyValue> generateKeyValues(int count) {
+    List<KeyValue> kvs = Lists.newArrayList();
+    for (int i = 0; i < count; i++) {
+      kvs.add(new KeyValue(
+          Bytes.toBytes("row_" + i),
+          Bytes.toBytes("family"),
+          Bytes.toBytes("qualifier_" + i)));
+    }
+    Collections.sort(kvs, KeyValue.COMPARATOR);
+    return kvs;
+  }
+
+  private Path writeKeyValuesToHFile(Path inputPath, List<KeyValue> kvs) throws IOException
{
+    HFile.Writer w = null;
+    try {
+      List<KeyValue> sortedKVs = Lists.newArrayList(kvs);
+      Collections.sort(sortedKVs, KeyValue.COMPARATOR);
+      FileSystem fs = FileSystem.get(conf);
+      w = HFile.getWriterFactory(conf, new CacheConfig(conf))
+          .withPath(fs, inputPath)
+          .withComparator(KeyValue.KEY_COMPARATOR)
+          .create();
+      for (KeyValue kv : sortedKVs) {
+        w.append(kv);
+      }
+      return inputPath;
+    } finally {
+      IOUtils.closeQuietly(w);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
index 221de9b..18d5a95 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
@@ -19,6 +19,8 @@ package org.apache.crunch.io.hbase;
 
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -36,4 +38,11 @@ public class FromHBase {
     return new HBaseSourceTarget(table, scan);
   }
 
+  public static Source<KeyValue> hfile(String path) {
+    return hfile(new Path(path));
+  }
+
+  public static Source<KeyValue> hfile(Path path) {
+    return new HFileSource(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
new file mode 100644
index 0000000..07b4b15
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Simple input format for HFiles.
+ */
+public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> {
+
+  private static final Log LOG = LogFactory.getLog(HFileInputFormat.class);
+  static final String START_ROW_KEY = "crunch.hbase.hfile.input.format.start.row";
+  static final String STOP_ROW_KEY = "crunch.hbase.hfile.input.format.stop.row";
+
+  /**
+   * File filter that removes all "hidden" files. This might be something worth removing
from
+   * a more general purpose utility; it accounts for the presence of metadata files created
+   * in the way we're doing exports.
+   */
+  private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Record reader for HFiles.
+   */
+  private static class HFileRecordReader extends RecordReader<NullWritable, KeyValue>
{
+
+    private Reader in;
+    protected Configuration conf;
+    private HFileScanner scanner;
+
+    /**
+     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
+     */
+    private KeyValue value = null;
+    private byte[] startRow = null;
+    private byte[] stopRow = null;
+    private boolean reachedStopRow = false;
+    private long count;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
+      FileSplit fileSplit = (FileSplit) split;
+      conf = context.getConfiguration();
+      Path path = fileSplit.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+
+      long fileSize = fs.getFileStatus(path).getLen();
+
+      // Open the underlying input stream; it will be closed by the HFileReader below.
+      FSDataInputStream iStream = fs.open(path);
+      FixedFileTrailer fileTrailer = FixedFileTrailer.readFromStream(iStream, fileSize);
+
+      // If this class is generalized, it may need to account for different data block encodings.
+      this.in = new HFileReaderV2(path, fileTrailer, iStream, iStream, fileSize, true, new
CacheConfig(conf),
+          DataBlockEncoding.NONE, new  HFileSystem(fs));
+
+      // The file info must be loaded before the scanner can be used.
+      // This seems like a bug in HBase, but it's easily worked around.
+      this.in.loadFileInfo();
+      this.scanner = in.getScanner(false, false);
+
+      String startRowStr = conf.get(START_ROW_KEY);
+      if (startRowStr != null) {
+        this.startRow = decodeHexOrDie(startRowStr);
+      }
+      String stopRowStr = conf.get(STOP_ROW_KEY);
+      if (stopRowStr != null) {
+        this.stopRow = decodeHexOrDie(stopRowStr);
+      }
+    }
+
+    private static byte[] decodeHexOrDie(String s) {
+      try {
+        return Hex.decodeHex(s.toCharArray());
+      } catch (DecoderException e) {
+        throw new AssertionError("Failed to decode hex string: " + s);
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (reachedStopRow) {
+        return false;
+      }
+      boolean hasNext;
+      if (!scanner.isSeeked()) {
+        if (startRow != null) {
+          LOG.info("Seeking to start row " + Bytes.toStringBinary(startRow));
+          KeyValue kv = KeyValue.createFirstOnRow(startRow);
+          hasNext = (scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength())
>= 0);
+        } else {
+          LOG.info("Seeking to start");
+          hasNext = scanner.seekTo();
+        }
+      } else {
+        hasNext = scanner.next();
+      }
+      if (!hasNext) {
+        return false;
+      }
+      value = scanner.getKeyValue();
+      if (stopRow != null && Bytes.compareTo(
+          value.getBuffer(), value.getRowOffset(), value.getRowLength(),
+          stopRow, 0, stopRow.length) >= 0) {
+        LOG.info("Reached stop row " + Bytes.toStringBinary(stopRow));
+        reachedStopRow = true;
+        value = null;
+        return false;
+      }
+      count++;
+      return true;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public KeyValue getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek
to
+      // the start row, but better than nothing anyway.
+      return 1.0f * count / in.getEntries();
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+  }
+
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+
+    // Explode out directories that match the original FileInputFormat filters since HFiles
are written to directories where the
+    // directory name is the column name
+    for (FileStatus status : super.listStatus(job)) {
+      if (status.isDir()) {
+        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
+          result.add(match);
+        }
+      }
+      else{
+        result.add(status);
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public RecordReader<NullWritable, KeyValue> createRecordReader(InputSplit split,
TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new HFileRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    // This file isn't splittable.
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
new file mode 100644
index 0000000..13137b8
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.crunch.types.writable.Writables.writables;
+
+public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSource<KeyValue>
{
+
+  private static final PType<KeyValue> KEY_VALUE_PTYPE = writables(KeyValue.class);
+
+  public HFileSource(Path path) {
+    this(ImmutableList.of(path));
+  }
+
+  public HFileSource(List<Path> paths) {
+    this(paths, new Scan());
+  }
+
+  // Package-local. Don't want it to be too open, because we only support limited filters
yet
+  // (namely start/stop row). Users who need advanced filters should use HFileUtils#scanHFiles.
+  HFileSource(List<Path> paths, Scan scan) {
+    super(paths, KEY_VALUE_PTYPE, createInputFormatBundle(scan));
+  }
+
+  private static FormatBundle<HFileInputFormat> createInputFormatBundle(Scan scan)
{
+    FormatBundle<HFileInputFormat> bundle = FormatBundle.forInput(HFileInputFormat.class);
+    if (!Objects.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) {
+      bundle.set(HFileInputFormat.START_ROW_KEY, Hex.encodeHexString(scan.getStartRow()));
+    }
+    if (!Objects.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+      bundle.set(HFileInputFormat.STOP_ROW_KEY, Hex.encodeHexString(scan.getStopRow()));
+    }
+    return bundle;
+  }
+
+  @Override
+  public Iterable<KeyValue> read(Configuration conf) throws IOException {
+    throw new UnsupportedOperationException("HFileSource#read(Configuration) is not implemented
yet");
+  }
+
+  @Override
+  public String toString() {
+    return "HFile(" + pathsAsString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index df98325..e2f1520 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.io.hbase;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,22 +30,35 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.sort.TotalOrderPartitioner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
 
+import static org.apache.crunch.types.writable.Writables.bytes;
 import static org.apache.crunch.types.writable.Writables.nulls;
 import static org.apache.crunch.types.writable.Writables.tableOf;
 import static org.apache.crunch.types.writable.Writables.writables;
@@ -69,6 +83,118 @@ public final class HFileUtils {
     }
   }
 
+  private static class StartRowFilterFn extends FilterFn<KeyValue> {
+
+    private final byte[] startRow;
+
+    private StartRowFilterFn(byte[] startRow) {
+      this.startRow = startRow;
+    }
+
+    @Override
+    public boolean accept(KeyValue input) {
+      return Bytes.compareTo(input.getRow(), startRow) >= 0;
+    }
+  }
+
+  private static class StopRowFilterFn extends FilterFn<KeyValue> {
+
+    private final byte[] stopRow;
+
+    private StopRowFilterFn(byte[] stopRow) {
+      this.stopRow = stopRow;
+    }
+
+    @Override
+    public boolean accept(KeyValue input) {
+      return Bytes.compareTo(input.getRow(), stopRow) < 0;
+    }
+  }
+
+  private static class FamilyMapFilterFn extends FilterFn<KeyValue> {
+
+    private static class Column implements Serializable {
+
+      private final byte[] family;
+      private final byte[] qualifier;
+
+      private Column(byte[] family, byte[] qualifier) {
+        this.family = family;
+        this.qualifier = qualifier;
+      }
+
+      private byte[] getFamily() {
+        return family;
+      }
+
+      private byte[] getQualifier() {
+        return qualifier;
+      }
+    }
+
+    private final List<byte[]> families = Lists.newArrayList();
+    private final List<Column> qualifiers = Lists.newArrayList();
+
+    private transient Set<ByteBuffer> familySet;
+    private transient Set<Pair<ByteBuffer, ByteBuffer>> qualifierSet;
+
+    private FamilyMapFilterFn(Map<byte[], NavigableSet<byte[]>> familyMap) {
+      // Holds good families and qualifiers in Lists, as ByteBuffer is not Serializable.
+      for (Map.Entry<byte[], NavigableSet<byte[]>> e : familyMap.entrySet())
{
+        byte[] f = e.getKey();
+        if (e.getValue() == null) {
+          families.add(f);
+        } else {
+          for (byte[] q : e.getValue()) {
+            qualifiers.add(new Column(f, q));
+          }
+        }
+      }
+    }
+
+    @Override
+    public void initialize() {
+      ImmutableSet.Builder<ByteBuffer> familiySetBuilder = ImmutableSet.builder();
+      ImmutableSet.Builder<Pair<ByteBuffer, ByteBuffer>> qualifierSetBuilder
+          = ImmutableSet.builder();
+      for (byte[] f : families) {
+        familiySetBuilder.add(ByteBuffer.wrap(f));
+      }
+      for (Column e : qualifiers) {
+        byte[] f = e.getFamily();
+        byte[] q = e.getQualifier();
+        qualifierSetBuilder.add(Pair.of(ByteBuffer.wrap(f), ByteBuffer.wrap(q)));
+      }
+      this.familySet = familiySetBuilder.build();
+      this.qualifierSet = qualifierSetBuilder.build();
+    }
+
+    @Override
+    public boolean accept(KeyValue input) {
+      byte[] b = input.getBuffer();
+      ByteBuffer f = ByteBuffer.wrap(b, input.getFamilyOffset(), input.getFamilyLength());
+      ByteBuffer q = ByteBuffer.wrap(b, input.getQualifierOffset(), input.getQualifierLength());
+      return familySet.contains(f) || qualifierSet.contains(Pair.of(f, q));
+    }
+  }
+
+  private static class TimeRangeFilterFn extends FilterFn<KeyValue> {
+
+    private final long minTimestamp;
+    private final long maxTimestamp;
+
+    private TimeRangeFilterFn(TimeRange timeRange) {
+      // Can't save TimeRange to member directly, as it is not Serializable.
+      this.minTimestamp = timeRange.getMin();
+      this.maxTimestamp = timeRange.getMax();
+    }
+
+    @Override
+    public boolean accept(KeyValue input) {
+      return (minTimestamp <= input.getTimestamp() && input.getTimestamp() <
maxTimestamp);
+    }
+  }
+
   private static class KeyValueComparator implements RawComparator<KeyValue> {
 
     @Override
@@ -92,7 +218,84 @@ public final class HFileUtils {
     }
   }
 
-  private HFileUtils() {}
+  private static final MapFn<KeyValue,ByteBuffer> EXTRACT_ROW_FN = new MapFn<KeyValue,
ByteBuffer>() {
+    @Override
+    public ByteBuffer map(KeyValue input) {
+      // we have to make a copy of row, because the buffer may be changed after this call
+      return ByteBuffer.wrap(Arrays.copyOfRange(
+          input.getBuffer(), input.getRowOffset(), input.getRowOffset() + input.getRowLength()));
+    }
+  };
+
+  public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) {
+    return scanHFiles(pipeline, path, new Scan());
+  }
+
+  /**
+   * Scans HFiles with filter conditions.
+   *
+   * @param pipeline the pipeline
+   * @param path path to HFiles
+   * @param scan filtering conditions
+   * @return {@code Result}s
+   * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+   */
+  public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan)
{
+    // TODO(chaoshi): HFileInputFormat may skip some HFiles if their KVs do not fall into
+    //                the range specified by this scan.
+    PCollection<KeyValue> in = pipeline.read(new HFileSource(ImmutableList.of(path),
scan));
+    return combineIntoRow(in, scan);
+  }
+
+  public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs)
{
+    return combineIntoRow(kvs, new Scan());
+  }
+
+  /**
+   * Converts a bunch of {@link KeyValue}s into {@link Result}.
+   *
+   * All {@code KeyValue}s belong to the same row are combined. Users may provide some filter
+   * conditions (specified by {@code scan}). Deletes are dropped and only a specified number
+   * of versions are kept.
+   *
+   * @param kvs the input {@code KeyValue}s
+   * @param scan filter conditions, currently we support start row, stop row and family map
+   * @return {@code Result}s
+   */
+  public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs,
Scan scan) {
+    if (!Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) {
+      kvs = kvs.filter(new StartRowFilterFn(scan.getStartRow()));
+    }
+    if (!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+      kvs = kvs.filter(new StopRowFilterFn(scan.getStopRow()));
+    }
+    if (scan.hasFamilies()) {
+      kvs = kvs.filter(new FamilyMapFilterFn(scan.getFamilyMap()));
+    }
+    TimeRange timeRange = scan.getTimeRange();
+    if (timeRange != null && (timeRange.getMin() > 0 || timeRange.getMax() <
Long.MAX_VALUE)) {
+      kvs = kvs.filter(new TimeRangeFilterFn(timeRange));
+    }
+    // TODO(chaoshi): support Scan#getFilter
+
+    PTable<ByteBuffer, KeyValue> kvsByRow = kvs.by(EXTRACT_ROW_FN, bytes());
+    final int versions = scan.getMaxVersions();
+    return kvsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow",
+        new DoFn<Pair<ByteBuffer, Iterable<KeyValue>>, Result>() {
+          @Override
+          public void process(Pair<ByteBuffer, Iterable<KeyValue>> input, Emitter<Result>
emitter) {
+            List<KeyValue> kvs = Lists.newArrayList();
+            for (KeyValue kv : input.second()) {
+              kvs.add(kv.clone()); // assuming the input fits into memory
+            }
+            Result result = doCombineIntoRow(kvs, versions);
+            if (result == null) {
+              return;
+            }
+            emitter.emit(result);
+          }
+        }, writables(Result.class));
+  }
 
   public static void writeToHFilesForIncrementalLoad(
       PCollection<KeyValue> kvs,
@@ -178,4 +381,101 @@ public final class HFileUtils {
     }
     writer.close();
   }
+
+  private static Result doCombineIntoRow(List<KeyValue> kvs, int versions) {
+    // shortcut for the common case
+    if (kvs.isEmpty()) {
+      return null;
+    }
+    if (kvs.size() == 1 && kvs.get(0).getType() == KeyValue.Type.Put.getCode()) {
+      return new Result(kvs);
+    }
+
+    kvs = maybeDeleteFamily(kvs);
+
+    // In-place sort KeyValues by family, qualifier and then timestamp reversely.
+    Collections.sort(kvs, KeyValue.COMPARATOR);
+
+    List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size());
+    for (int i = 0, j; i < kvs.size(); i = j) {
+      j = i + 1;
+      while (j < kvs.size() && hasSameFamilyAndQualifier(kvs.get(i), kvs.get(j)))
{
+        j++;
+      }
+      results.addAll(getLatestKeyValuesOfColumn(kvs.subList(i, j), versions));
+    }
+    return new Result(results);
+  }
+
+  /**
+   * In-place removes any {@link KeyValue}s whose timestamp is less than or equal to the
+   * delete family timestamp. Also removes the delete family {@code KeyValue}s.
+   */
+  private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) {
+    long deleteFamilyCut = 0;
+    for (KeyValue kv : kvs) {
+      if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
+        deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp());
+      }
+    }
+    if (deleteFamilyCut == 0) {
+      return kvs;
+    }
+    List<KeyValue> results = Lists.newArrayList();
+    for (KeyValue kv : kvs) {
+      if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
+        continue;
+      }
+      if (kv.getTimestamp() <= deleteFamilyCut) {
+        continue;
+      }
+      results.add(kv);
+    }
+    return results;
+  }
+
+  private static boolean hasSameFamilyAndQualifier(KeyValue l, KeyValue r) {
+    return Bytes.equals(
+        l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(),
+        r.getBuffer(), r.getFamilyOffset(), r.getFamilyOffset())
+        && Bytes.equals(
+        l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(),
+        r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength());
+  }
+
+  /**
+   * Goes over the given {@link KeyValue}s and remove {@code Delete}s and {@code DeleteColumn}s.
+   *
+   * @param kvs {@code KeyValue}s that of same row and column and sorted by timestamps in
+   *            descending order
+   * @param versions the number of versions to keep
+   * @return the resulting {@code KeyValue}s that contains only {@code Put}s
+   */
+  private static List<KeyValue> getLatestKeyValuesOfColumn(List<KeyValue> kvs,
int versions) {
+    if (kvs.isEmpty()) {
+      return kvs;
+    }
+    if (kvs.get(0).getType() == KeyValue.Type.Put.getCode()) {
+      return kvs; // shortcut for the common case
+    }
+
+    List<KeyValue> results = Lists.newArrayListWithCapacity(versions);
+    long previousDeleteTimestamp = -1;
+    for (KeyValue kv : kvs) {
+      if (results.size() >= versions) {
+        break;
+      }
+      if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) {
+        break;
+      } else if (kv.getType() == KeyValue.Type.Put.getCode()
+          && kv.getTimestamp() != previousDeleteTimestamp) {
+        results.add(kv);
+      } else if (kv.getType() == KeyValue.Type.Delete.getCode()) {
+        previousDeleteTimestamp = kv.getTimestamp();
+      } else {
+        throw new AssertionError("Unexpected KeyValue type: " + kv.getType());
+      }
+    }
+    return results;
+  }
 }


Mime
View raw message