crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject crunch git commit: CRUNCH-533: Extend HBaseSourceTarget to support custom MultiTableInputFormat implementations.
Date Thu, 02 Jul 2015 02:42:34 GMT
Repository: crunch
Updated Branches:
  refs/heads/master d176778cf -> b06c5cc27


CRUNCH-533: Extend HBaseSourceTarget to support custom MultiTableInputFormat implementations.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b06c5cc2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b06c5cc2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b06c5cc2

Branch: refs/heads/master
Commit: b06c5cc27b97d92d5c0b689cf58af90166155717
Parents: d176778
Author: Micah Whitacre <mkwhit@gmail.com>
Authored: Mon Jun 29 20:53:45 2015 -0500
Committer: Micah Whitacre <mkwhit@gmail.com>
Committed: Mon Jun 29 20:53:45 2015 -0500

----------------------------------------------------------------------
 .../crunch/io/hbase/WordCountHBaseIT.java       | 30 +++++++++++++++++++-
 .../crunch/io/hbase/HBaseSourceTarget.java      |  9 ++++--
 2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b06c5cc2/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index dd48352..28ead90 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
@@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.Before;
@@ -129,6 +132,12 @@ public class WordCountHBaseIT {
     run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()));
   }
 
+  @Test
+  public void testWordCountCustomFormat() throws Exception {
+    run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()), MyTableInputFormat.class);
+    assertTrue(MyTableInputFormat.CONSTRUCTED.get());
+  }
+
   @After
   public void tearDown() throws Exception {
     hbaseTestUtil.shutdownMiniHBaseCluster();
@@ -136,6 +145,10 @@ public class WordCountHBaseIT {
   }
 
   public void run(Pipeline pipeline) throws Exception {
+    run(pipeline, null);
+  }
+
+  public void run(Pipeline pipeline, Class<? extends MultiTableInputFormatBase> clazz)
throws Exception {
 
     Random rand = new Random();
     int postFix = rand.nextInt() & 0x7FFFFFFF;
@@ -163,7 +176,13 @@ public class WordCountHBaseIT {
     scan.addFamily(WORD_COLFAM);
     scan2.setStartRow(cutoffPoint);
 
-    HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2);
+    HBaseSourceTarget source = null;
+    if(clazz == null){
+      source = new HBaseSourceTarget(inputTableName, scan, scan2);
+    }else{
+      source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2});
+    }
+
     PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
 
     Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();
@@ -237,4 +256,13 @@ public class WordCountHBaseIT {
     assertTrue(result.isEmpty());
   }
 
+  public static class MyTableInputFormat extends MultiTableInputFormat{
+
+    public static final AtomicBoolean CONSTRUCTED = new AtomicBoolean();
+
+    public MyTableInputFormat(){
+      CONSTRUCTED.set(true);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b06c5cc2/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index a957898..c98436d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -65,7 +66,7 @@ public class HBaseSourceTarget extends HBaseTarget implements
 
   protected Scan[] scans;
   protected String scansAsString;
-  private FormatBundle<MultiTableInputFormat> inputBundle;
+  private FormatBundle<? extends MultiTableInputFormatBase> inputBundle;
   
   public HBaseSourceTarget(String table, Scan scan) {
     this(table, new Scan[] { scan });
@@ -76,6 +77,10 @@ public class HBaseSourceTarget extends HBaseTarget implements
   }
   
   public HBaseSourceTarget(String table, Scan[] scans) {
+    this(table, MultiTableInputFormat.class, scans);
+  }
+
+  public HBaseSourceTarget(String table, Class<? extends MultiTableInputFormatBase>
clazz,  Scan[] scans) {
     super(table);
     this.scans = scans;
 
@@ -94,7 +99,7 @@ public class HBaseSourceTarget extends HBaseTarget implements
       }
       this.scans = tableScans;
       this.scansAsString = StringUtils.arrayToString(scanStrings);
-      this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class)
+      this.inputBundle = FormatBundle.forInput(clazz)
           .set(MultiTableInputFormat.SCANS, scansAsString);
     } catch (IOException e) {
       throw new RuntimeException(e);


Mime
View raw message