crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-475: Update HBase to 1.0.0 and Hadoop to 2.5.2
Date Sat, 23 May 2015 21:26:52 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 6c8c5ffd0 -> 5bd258d1c


CRUNCH-475: Update HBase to 1.0.0 and Hadoop to 2.5.2


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5bd258d1
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5bd258d1
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5bd258d1

Branch: refs/heads/master
Commit: 5bd258d1c0244fdb2c8a6d83b766fde08bb33354
Parents: 6c8c5ff
Author: Josh Wills <jwills@apache.org>
Authored: Tue Jan 13 16:13:25 2015 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Sat May 23 14:12:04 2015 -0700

----------------------------------------------------------------------
 crunch-hbase/pom.xml                            |  78 ++++---------
 .../apache/crunch/io/hbase/HFileTargetIT.java   |  17 ++-
 .../org/apache/crunch/io/hbase/HBaseTypes.java  |  33 +++++-
 .../crunch/io/hbase/HFileInputFormat.java       |   9 +-
 .../io/hbase/HFileOutputFormatForCrunch.java    |  22 ++--
 .../crunch/io/hbase/HFileReaderFactory.java     |   6 +-
 .../org/apache/crunch/io/hbase/HFileTarget.java |  10 +-
 .../org/apache/crunch/io/hbase/HFileUtils.java  | 115 +++++++++----------
 crunch-spark/pom.xml                            |   4 +
 pom.xml                                         |  11 +-
 10 files changed, 153 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index d1a33f5..5956faf 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -88,12 +88,6 @@ under the License.
 
     <dependency>
       <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-${hbase.midfix}-compat</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-testing-util</artifactId>
       <scope>provided</scope>
     </dependency>
@@ -128,55 +122,33 @@ under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
   </dependencies>
 
-  <profiles>
-    <profile>
-      <id>hadoop-1</id>
-      <activation>
-        <property>
-          <name>!crunch.platform</name>
-        </property>
-      </activation>
-    </profile>
-    <profile>
-      <id>hadoop-2</id>
-      <activation>
-        <property>
-          <name>crunch.platform</name>
-          <value>2</value>
-        </property>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-auth</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-          <type>test-jar</type>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-annotations</artifactId>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
   <build>
     <plugins>
       <plugin>

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 71cf31f..ddb1292 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
@@ -41,11 +42,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
@@ -316,7 +320,8 @@ public class HFileTargetIT implements Serializable {
           w = "__EMPTY__";
         }
         long c = input.second();
-        return Pair.of(new KeyValue(Bytes.toBytes(w), TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c)),
null);
+        Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c));
+        return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null);
       }
     }, tableOf(HBaseTypes.keyValues(), nulls()))
         .groupByKey(GroupingOptions.builder()
@@ -359,9 +364,9 @@ public class HFileTargetIT implements Serializable {
     KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
     boolean seekOk = kvh.seek(fakeKV);
     assertTrue(seekOk);
-    KeyValue kv = kvh.next();
+    Cell kv = kvh.next();
     kvh.close();
-    return kv;
+    return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of());
   }
 
   private static Path copyResourceFileToHDFS(String resourceName) throws IOException {
@@ -390,11 +395,11 @@ public class HFileTargetIT implements Serializable {
 
   private static long getWordCountFromTable(HTable table, String word) throws IOException
{
     Get get = new Get(Bytes.toBytes(word));
-    KeyValue keyValue = table.get(get).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER);
-    if (keyValue == null) {
+    byte[] value = table.get(get).value();
+    if (value == null) {
       fail("no such row: " +  word);
     }
-    return Bytes.toLong(keyValue.getValue());
+    return Bytes.toLong(value);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
index f8a259d..787b9c6 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
@@ -19,15 +19,17 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.io.BytesWritable;
@@ -83,15 +85,36 @@ public final class HBaseTypes {
         Writables.writables(BytesWritable.class));
   }
 
-  public static BytesWritable keyValueToBytes(KeyValue input) {
+  public static final PType<Cell> cells() {
+    return Writables.derived(Cell.class,
+        new MapFn<BytesWritable, Cell>() {
+          @Override
+          public Cell map(BytesWritable input) {
+            return bytesToKeyValue(input);
+          }
+        },
+        new MapFn<Cell, BytesWritable>() {
+          @Override
+          public BytesWritable map(Cell input) {
+            return keyValueToBytes(input);
+          }
+        },
+        Writables.writables(BytesWritable.class));
+  }
+
+  public static BytesWritable keyValueToBytes(Cell input) {
+    return keyValueToBytes(KeyValue.cloneAndAddTags(input, ImmutableList.<Tag>of()));
+  }
+
+  public static BytesWritable keyValueToBytes(KeyValue kv) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(baos);
     try {
-      KeyValue.write(input, dos);
-    } catch (IOException e) {
+      KeyValue.write(kv, dos);
+      return new BytesWritable(baos.toByteArray());
+    } catch (Exception e) {
       throw new CrunchRuntimeException(e);
     }
-    return new BytesWritable(baos.toByteArray());
   }
 
   public static KeyValue bytesToKeyValue(BytesWritable input) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
index ca886f6..1d8e106 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
@@ -24,7 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
@@ -140,10 +143,8 @@ public class HFileInputFormat extends FileInputFormat<NullWritable,
KeyValue> {
       if (!hasNext) {
         return false;
       }
-      value = scanner.getKeyValue();
-      if (stopRow != null && Bytes.compareTo(
-          value.getBuffer(), value.getRowOffset(), value.getRowLength(),
-          stopRow, 0, stopRow.length) >= 0) {
+      value = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of());
+      if (stopRow != null && Bytes.compareTo(CellUtil.cloneRow(value), stopRow) >=
0) {
         if(LOG.isInfoEnabled()) {
           LOG.info("Reached stop row {}", Bytes.toStringBinary(stopRow));
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index 0c64e5e..7611235 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -19,14 +19,17 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -44,7 +47,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 
 /**
- * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append(byte[],
byte[])}
+ * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append}
  * when records are emitted. It only supports writing data into a single column family. Records
MUST be sorted
  * by their column qualifier, then timestamp reversely. All data are written into a single
HFile.
  *
@@ -53,7 +56,7 @@ import java.io.IOException;
  * As crunch supports more complex and flexible MapReduce pipeline, we would prefer thin
and pure
  * {@code OutputFormat} here.
  */
-public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValue>
{
+public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> {
 
   public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor";
   private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude";
@@ -63,7 +66,7 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object,
KeyValu
   private final TimeRangeTracker trt = new TimeRangeTracker();
 
   @Override
-  public RecordWriter<Object, KeyValue> getRecordWriter(final TaskAttemptContext context)
+  public RecordWriter<Object, Cell> getRecordWriter(final TaskAttemptContext context)
       throws IOException, InterruptedException {
     Path outputPath = getDefaultWorkFile(context, "");
     Configuration conf = context.getConfiguration();
@@ -92,15 +95,16 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object,
KeyValu
         .withFileContext(getContext(hcol))
         .create();
 
-    return new RecordWriter<Object, KeyValue>() {
+    return new RecordWriter<Object, Cell>() {
       @Override
-      public void write(Object row, KeyValue kv)
+      public void write(Object row, Cell cell)
           throws IOException {
-        if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
-          kv.updateLatestStamp(now);
+        KeyValue copy = KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of());
+        if (copy.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+          copy.updateLatestStamp(now);
         }
-        writer.append(kv);
-        trt.includeTimestamp(kv);
+        writer.append(copy);
+        trt.includeTimestamp(copy);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
index 6189775..14e6118 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
@@ -17,11 +17,13 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -57,7 +59,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue>
{
 
     public HFileIterator(HFileScanner scanner) {
       this.scanner = scanner;
-      this.curr = scanner.getKeyValue();
+      this.curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of());
     }
 
     @Override
@@ -70,7 +72,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue>
{
       KeyValue ret = curr;
       try {
         if (scanner.next()) {
-          curr = scanner.getKeyValue();
+          curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of());
         } else {
           curr = null;
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index d9bbf7f..41d56ff 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -26,9 +26,9 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
 import org.apache.hadoop.io.WritableUtils;
@@ -68,13 +68,13 @@ public class HFileTarget extends FileTargetImpl {
     if (ptype instanceof PTableType) {
       valueType = ((PTableType) ptype).getValueType();
     }
-    if (!KeyValue.class.equals(valueType.getTypeClass())) {
-      throw new IllegalArgumentException("HFileTarget only supports KeyValue outputs");
+    if (!Cell.class.isAssignableFrom(valueType.getTypeClass())) {
+      throw new IllegalArgumentException("HFileTarget only supports Cell outputs");
     }
     if (ptype instanceof PTableType) {
-      return new HBasePairConverter<ImmutableBytesWritable, KeyValue>(ImmutableBytesWritable.class,
KeyValue.class);
+      return new HBasePairConverter<ImmutableBytesWritable, Cell>(ImmutableBytesWritable.class,
Cell.class);
     }
-    return new HBaseValueConverter<KeyValue>(KeyValue.class);
+    return new HBaseValueConverter<Cell>(Cell.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 252bad7..34118ca 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -24,7 +24,6 @@ import static org.apache.crunch.types.writable.Writables.tableOf;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -34,6 +33,7 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 import org.apache.crunch.DoFn;
@@ -49,9 +49,12 @@ import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.lib.sort.TotalOrderPartitioner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -113,7 +116,7 @@ public final class HFileUtils {
 
   };
 
-  private static class FilterByFamilyFn extends FilterFn<KeyValue> {
+  private static class FilterByFamilyFn<C extends Cell> extends FilterFn<C> {
 
     private final byte[] family;
 
@@ -122,14 +125,12 @@ public final class HFileUtils {
     }
 
     @Override
-    public boolean accept(KeyValue input) {
-      return Bytes.equals(
-          input.getBuffer(), input.getFamilyOffset(), input.getFamilyLength(),
-          family, 0, family.length);
+    public boolean accept(C input) {
+      return Bytes.equals(CellUtil.cloneFamily(input), family);
     }
   }
 
-  private static class StartRowFilterFn extends FilterFn<KeyValue> {
+  private static class StartRowFilterFn<C extends Cell> extends FilterFn<C> {
 
     private final byte[] startRow;
 
@@ -138,12 +139,12 @@ public final class HFileUtils {
     }
 
     @Override
-    public boolean accept(KeyValue input) {
-      return Bytes.compareTo(input.getRow(), startRow) >= 0;
+    public boolean accept(C input) {
+      return Bytes.compareTo(CellUtil.cloneRow(input), startRow) >= 0;
     }
   }
 
-  private static class StopRowFilterFn extends FilterFn<KeyValue> {
+  private static class StopRowFilterFn<C extends Cell> extends FilterFn<C> {
 
     private final byte[] stopRow;
 
@@ -152,12 +153,12 @@ public final class HFileUtils {
     }
 
     @Override
-    public boolean accept(KeyValue input) {
-      return Bytes.compareTo(input.getRow(), stopRow) < 0;
+    public boolean accept(C input) {
+      return Bytes.compareTo(CellUtil.cloneRow(input), stopRow) < 0;
     }
   }
 
-  private static class FamilyMapFilterFn extends FilterFn<KeyValue> {
+  private static class FamilyMapFilterFn<C extends Cell> extends FilterFn<C>
{
 
     private static class Column implements Serializable {
 
@@ -216,15 +217,14 @@ public final class HFileUtils {
     }
 
     @Override
-    public boolean accept(KeyValue input) {
-      byte[] b = input.getBuffer();
-      ByteBuffer f = ByteBuffer.wrap(b, input.getFamilyOffset(), input.getFamilyLength());
-      ByteBuffer q = ByteBuffer.wrap(b, input.getQualifierOffset(), input.getQualifierLength());
+    public boolean accept(C input) {
+      ByteBuffer f = ByteBuffer.wrap(CellUtil.cloneFamily(input));
+      ByteBuffer q = ByteBuffer.wrap(CellUtil.cloneQualifier(input));
       return familySet.contains(f) || qualifierSet.contains(Pair.of(f, q));
     }
   }
 
-  private static class TimeRangeFilterFn extends FilterFn<KeyValue> {
+  private static class TimeRangeFilterFn<C extends Cell> extends FilterFn<C>
{
 
     private final long minTimestamp;
     private final long maxTimestamp;
@@ -236,7 +236,7 @@ public final class HFileUtils {
     }
 
     @Override
-    public boolean accept(KeyValue input) {
+    public boolean accept(C input) {
       return (minTimestamp <= input.getTimestamp() && input.getTimestamp() <
maxTimestamp);
     }
   }
@@ -253,8 +253,8 @@ public final class HFileUtils {
       if (rlength < 4) {
         throw new AssertionError("Too small rlength: " + rlength);
       }
-      KeyValue leftKey = HBaseTypes.bytesToKeyValue(left, loffset + 4, llength - 4);
-      KeyValue rightKey = HBaseTypes.bytesToKeyValue(right, roffset + 4, rlength - 4);
+      Cell leftKey = HBaseTypes.bytesToKeyValue(left, loffset + 4, llength - 4);
+      Cell rightKey = HBaseTypes.bytesToKeyValue(right, roffset + 4, rlength - 4);
 
       byte[] lRow = leftKey.getRow();
       byte[] rRow = rightKey.getRow();
@@ -274,14 +274,13 @@ public final class HFileUtils {
     }
   }
 
-  private static final MapFn<KeyValue, ByteBuffer> EXTRACT_ROW_FN = new MapFn<KeyValue,
ByteBuffer>() {
+  private static class ExtractRowFn<C extends Cell> extends MapFn<C, ByteBuffer>
{
     @Override
-    public ByteBuffer map(KeyValue input) {
+    public ByteBuffer map(Cell input) {
       // we have to make a copy of row, because the buffer may be changed after this call
-      return ByteBuffer.wrap(Arrays.copyOfRange(
-          input.getBuffer(), input.getRowOffset(), input.getRowOffset() + input.getRowLength()));
+      return ByteBuffer.wrap(CellUtil.cloneRow(input));
     }
-  };
+  }
 
   public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) {
     return scanHFiles(pipeline, path, new Scan());
@@ -305,8 +304,8 @@ public final class HFileUtils {
       return combineIntoRow(in, scan);
   }
 
-  public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs)
{
-    return combineIntoRow(kvs, new Scan());
+  public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C>
cells) {
+    return combineIntoRow(cells, new Scan());
   }
 
   /**
@@ -316,41 +315,41 @@ public final class HFileUtils {
    * conditions (specified by {@code scan}). Deletes are dropped and only a specified number
    * of versions are kept.
    *
-   * @param kvs the input {@code KeyValue}s
+   * @param cells the input {@code KeyValue}s
    * @param scan filter conditions, currently we support start row, stop row and family map
    * @return {@code Result}s
    */
-  public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs,
Scan scan) {
+  public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C>
cells, Scan scan) {
     if (!Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) {
-      kvs = kvs.filter(new StartRowFilterFn(scan.getStartRow()));
+      cells = cells.filter(new StartRowFilterFn<C>(scan.getStartRow()));
     }
     if (!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
-      kvs = kvs.filter(new StopRowFilterFn(scan.getStopRow()));
+      cells = cells.filter(new StopRowFilterFn<C>(scan.getStopRow()));
     }
     if (scan.hasFamilies()) {
-      kvs = kvs.filter(new FamilyMapFilterFn(scan.getFamilyMap()));
+      cells = cells.filter(new FamilyMapFilterFn<C>(scan.getFamilyMap()));
     }
     TimeRange timeRange = scan.getTimeRange();
     if (timeRange != null && (timeRange.getMin() > 0 || timeRange.getMax() <
Long.MAX_VALUE)) {
-      kvs = kvs.filter(new TimeRangeFilterFn(timeRange));
+      cells = cells.filter(new TimeRangeFilterFn<C>(timeRange));
     }
     // TODO(chaoshi): support Scan#getFilter
 
-    PTable<ByteBuffer, KeyValue> kvsByRow = kvs.by(EXTRACT_ROW_FN, bytes());
+    PTable<ByteBuffer, C> cellsByRow = cells.by(new ExtractRowFn<C>(), bytes());
     final int versions = scan.getMaxVersions();
-    return kvsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow",
-        new DoFn<Pair<ByteBuffer, Iterable<KeyValue>>, Result>() {
+    return cellsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow",
+        new DoFn<Pair<ByteBuffer, Iterable<C>>, Result>() {
           @Override
-          public void process(Pair<ByteBuffer, Iterable<KeyValue>> input, Emitter<Result>
emitter) {
-            List<KeyValue> kvs = Lists.newArrayList();
-            for (KeyValue kv : input.second()) {
+          public void process(Pair<ByteBuffer, Iterable<C>> input, Emitter<Result>
emitter) {
+            List<KeyValue> cells = Lists.newArrayList();
+            for (Cell kv : input.second()) {
               try {
-                kvs.add(kv.clone()); // assuming the input fits into memory
+                cells.add(KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of())); //
assuming the input fits into memory
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }
             }
-            Result result = doCombineIntoRow(kvs, versions);
+            Result result = doCombineIntoRow(cells, versions);
             if (result == null) {
               return;
             }
@@ -359,8 +358,8 @@ public final class HFileUtils {
         }, HBaseTypes.results());
   }
 
-  public static void writeToHFilesForIncrementalLoad(
-      PCollection<KeyValue> kvs,
+  public static <C extends Cell> void writeToHFilesForIncrementalLoad(
+      PCollection<C> cells,
       HTable table,
       Path outputPath) throws IOException {
     HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
@@ -370,7 +369,7 @@ public final class HFileUtils {
     }
     for (HColumnDescriptor f : families) {
       byte[] family = f.getName();
-      PCollection<KeyValue> sorted = sortAndPartition(kvs.filter(new FilterByFamilyFn(family)),
table);
+      PCollection<C> sorted = sortAndPartition(cells.filter(new FilterByFamilyFn<C>(family)),
table);
       sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f));
     }
   }
@@ -379,29 +378,27 @@ public final class HFileUtils {
       PCollection<Put> puts,
       HTable table,
       Path outputPath) throws IOException {
-    PCollection<KeyValue> kvs = puts.parallelDo("ConvertPutToKeyValue", new DoFn<Put,
KeyValue>() {
+    PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put,
Cell>() {
       @Override
-      public void process(Put input, Emitter<KeyValue> emitter) {
-        for (List<KeyValue> keyValues : input.getFamilyMap().values()) {
-          for (KeyValue keyValue : keyValues) {
-            emitter.emit(keyValue);
-          }
+      public void process(Put input, Emitter<Cell> emitter) {
+        for (Cell cell : Iterables.concat(input.getFamilyCellMap().values())) {
+          emitter.emit(cell);
         }
       }
-    }, HBaseTypes.keyValues());
-    writeToHFilesForIncrementalLoad(kvs, table, outputPath);
+    }, HBaseTypes.cells());
+    writeToHFilesForIncrementalLoad(cells, table, outputPath);
   }
 
-  public static PCollection<KeyValue> sortAndPartition(PCollection<KeyValue>
kvs, HTable table) throws IOException {
-    Configuration conf = kvs.getPipeline().getConfiguration();
-    PTable<KeyValue, Void> t = kvs.parallelDo(new MapFn<KeyValue, Pair<KeyValue,
Void>>() {
+  public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C>
cells, HTable table) throws IOException {
+    Configuration conf = cells.getPipeline().getConfiguration();
+    PTable<C, Void> t = cells.parallelDo(new MapFn<C, Pair<C, Void>>()
{
       @Override
-      public Pair<KeyValue, Void> map(KeyValue input) {
+      public Pair<C, Void> map(C input) {
         return Pair.of(input, (Void) null);
       }
-    }, tableOf(HBaseTypes.keyValues(), nulls()));
+    }, tableOf(cells.getPType(), nulls()));
     List<KeyValue> splitPoints = getSplitPoints(table);
-    Path partitionFile = new Path(((DistributedPipeline) kvs.getPipeline()).createTempPath(),
"partition");
+    Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(),
"partition");
     writePartitionInfo(conf, partitionFile, splitPoints);
     GroupingOptions options = GroupingOptions.builder()
         .partitionerClass(TotalOrderPartitioner.class)

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/crunch-spark/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml
index 7757daf..690256a 100644
--- a/crunch-spark/pom.xml
+++ b/crunch-spark/pom.xml
@@ -52,6 +52,10 @@ under the License.
       <scope>provided</scope>
       <exclusions>
         <exclusion>
+	  <groupId>javax.servlet</groupId>
+	  <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>com.sun.jersey</groupId>
           <artifactId>jersey-server</artifactId>
         </exclusion>

http://git-wip-us.apache.org/repos/asf/crunch/blob/5bd258d1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cce69bf..2eed8bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,10 +89,9 @@ under the License.
     <pkg>org.apache.crunch</pkg>
 
     <!-- Can be overridden by hadoop-2 profile, but these are the default values -->
-    <hadoop.version>2.2.0</hadoop.version>
-    <hbase.version>0.98.1-hadoop2</hbase.version>
+    <hadoop.version>2.5.2</hadoop.version>
+    <hbase.version>1.0.0</hbase.version>
     <commons-lang.version>2.5</commons-lang.version>
-    <hbase.midfix>hadoop2</hbase.midfix>
     <avro.classifier>hadoop2</avro.classifier>
 
     <scala.base.version>2.10</scala.base.version>
@@ -325,12 +324,6 @@ under the License.
 
       <dependency>
         <groupId>org.apache.hbase</groupId>
-        <artifactId>hbase-${hbase.midfix}-compat</artifactId>
-        <version>${hbase.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-testing-util</artifactId>
         <version>${hbase.version}</version>
       </dependency>


Mime
View raw message