crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject crunch git commit: CRUNCH-550: Removed deprecations in crunch-hbase also added support for TableName.
Date Wed, 29 Jul 2015 03:23:18 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 3a1d474b0 -> 8bae517ea


CRUNCH-550: Removed deprecations in crunch-hbase also added
 support for TableName.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8bae517e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8bae517e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8bae517e

Branch: refs/heads/master
Commit: 8bae517ea1e1cb919ecc067eb6b877a4f58916f5
Parents: 3a1d474
Author: Micah Whitacre <mkwhit@apache.org>
Authored: Mon Jul 27 20:57:07 2015 -0500
Committer: Micah Whitacre <mkwhit@apache.org>
Committed: Tue Jul 28 22:22:53 2015 -0500

----------------------------------------------------------------------
 .../crunch/io/hbase/WordCountHBaseIT.java       |  3 +-
 .../org/apache/crunch/io/hbase/FromHBase.java   |  9 ++++++
 .../org/apache/crunch/io/hbase/HBaseData.java   | 18 ++++++++++--
 .../crunch/io/hbase/HBaseSourceTarget.java      | 30 +++++++++++++++-----
 .../org/apache/crunch/io/hbase/HBaseTarget.java | 18 +++++++++++-
 .../crunch/io/hbase/HFileInputFormat.java       |  5 ++--
 .../org/apache/crunch/io/hbase/HFileSource.java |  2 +-
 .../apache/crunch/io/hbase/HTableIterable.java  | 10 +++++--
 .../apache/crunch/io/hbase/HTableIterator.java  | 14 +++++++--
 .../org/apache/crunch/io/hbase/ToHBase.java     |  5 ++++
 10 files changed, 93 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 28ead90..4a06c0f 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -42,6 +42,7 @@ import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -178,7 +179,7 @@ public class WordCountHBaseIT {
 
     HBaseSourceTarget source = null;
     if(clazz == null){
-      source = new HBaseSourceTarget(inputTableName, scan, scan2);
+      source = new HBaseSourceTarget(TableName.valueOf(inputTableName), scan, scan2);
     }else{
       source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2});
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
index 18d5a95..16f6694 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
@@ -21,6 +21,7 @@ import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -35,6 +36,14 @@ public class FromHBase {
   }
 
   public static TableSource<ImmutableBytesWritable, Result> table(String table, Scan
scan) {
+    return table(TableName.valueOf(table), scan);
+  }
+
+  public static TableSource<ImmutableBytesWritable, Result> table(TableName table)
{
+    return table(table, new Scan());
+  }
+
+  public static TableSource<ImmutableBytesWritable, Result> table(TableName table,
Scan scan) {
     return new HBaseSourceTarget(table, scan);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 4a721f3..4ac6c8e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -23,9 +23,13 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.StringUtils;
@@ -36,11 +40,13 @@ import java.util.Set;
 public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>>
{
 
   private final String table;
+  private transient TableName tableName;
   private final String scansAsString;
   private transient SourceTarget parent;
 
   public HBaseData(String table, String scansAsString, SourceTarget<?> parent) {
     this.table = table;
+    this.tableName = TableName.valueOf(table);
     this.scansAsString = scansAsString;
     this.parent = parent;
   }
@@ -63,7 +69,8 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable,
Resu
   public Iterable<Pair<ImmutableBytesWritable, Result>> read(
       TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
     Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration());
-    HTable htable = new HTable(hconf, table);
+    Connection connection = ConnectionFactory.createConnection(hconf);
+    Table htable = connection.getTable(getTableName());
 
     String[] scanStrings = StringUtils.getStrings(scansAsString);
     int length = scanStrings == null ? 0 : scanStrings.length;
@@ -72,6 +79,13 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable,
Resu
       scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]);
     }
 
-    return new HTableIterable(htable, scans);
+    return new HTableIterable(connection, htable, scans);
+  }
+
+  private TableName getTableName(){
+    if(tableName == null){
+      tableName = TableName.valueOf(table);
+    }
+    return tableName;
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index c98436d..ede7603 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -36,9 +36,12 @@ import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase;
@@ -66,6 +69,7 @@ public class HBaseSourceTarget extends HBaseTarget implements
 
   protected Scan[] scans;
   protected String scansAsString;
+
   private FormatBundle<? extends MultiTableInputFormatBase> inputBundle;
   
   public HBaseSourceTarget(String table, Scan scan) {
@@ -75,25 +79,37 @@ public class HBaseSourceTarget extends HBaseTarget implements
   public HBaseSourceTarget(String table, Scan scan, Scan... additionalScans) {
     this(table, ObjectArrays.concat(scan, additionalScans));
   }
+
+  public HBaseSourceTarget(TableName table, Scan scan, Scan... additionalScans) {
+    this(table, ObjectArrays.concat(scan, additionalScans));
+  }
   
   public HBaseSourceTarget(String table, Scan[] scans) {
     this(table, MultiTableInputFormat.class, scans);
   }
 
+  public HBaseSourceTarget(TableName table, Scan[] scans) {
+    this(table, MultiTableInputFormat.class, scans);
+  }
+
   public HBaseSourceTarget(String table, Class<? extends MultiTableInputFormatBase>
clazz,  Scan[] scans) {
-    super(table);
+    this(TableName.valueOf(table), clazz, scans);
+  }
+
+  public HBaseSourceTarget(TableName tableName, Class<? extends MultiTableInputFormatBase>
clazz,  Scan[] scans) {
+    super(tableName);
     this.scans = scans;
 
     try {
 
-      byte[] tableName = Bytes.toBytes(table);
+      byte[] tableNameAsBytes = Bytes.toBytes(table);
       //Copy scans and enforce that they are for the table specified
       Scan[] tableScans = new Scan[scans.length];
       String[] scanStrings = new String[scans.length];
       for(int i = 0; i < scans.length; i++){
         tableScans[i] =  new Scan(scans[i]);
         //enforce Scan is for same table
-        tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName);
+        tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableNameAsBytes);
         //Convert the Scan into a String
         scanStrings[i] = convertScanToString(tableScans[i]);
       }
@@ -190,8 +206,9 @@ public class HBaseSourceTarget extends HBaseTarget implements
   @Override
   public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf)
throws IOException {
     Configuration hconf = HBaseConfiguration.create(conf);
-    HTable htable = new HTable(hconf, table);
-    return new HTableIterable(htable, scans);
+    Connection connection = ConnectionFactory.createConnection(hconf);
+    Table htable = connection.getTable(getTableName());
+    return new HTableIterable(connection, htable, scans);
   }
 
   @Override
@@ -205,5 +222,4 @@ public class HBaseSourceTarget extends HBaseTarget implements
     outputConf(key, value);
     return this;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 7c67577..f287d5e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -34,6 +34,7 @@ import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -50,10 +51,18 @@ public class HBaseTarget implements MapReduceTarget {
   private static final Logger LOG = LoggerFactory.getLogger(HBaseTarget.class);
   
   protected String table;
+
+  private transient TableName tableName;
+
   private Map<String, String> extraConf = Maps.newHashMap();
 
   public HBaseTarget(String table) {
-    this.table = table;
+    this(TableName.valueOf(table));
+  }
+
+  public HBaseTarget(TableName tableName){
+    this.tableName = tableName;
+    this.table = tableName.getNameAsString();
   }
 
   @Override
@@ -153,4 +162,11 @@ public class HBaseTarget implements MapReduceTarget {
           ptype.getTypeClass());
     }
   }
+
+  protected TableName getTableName(){
+    if(tableName == null){
+      tableName = TableName.valueOf(table);
+    }
+    return tableName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
index 7381ddd..26821bf 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -185,7 +184,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable,
KeyValue> {
     // to depend on it.
     private static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
         throws IOException {
-      int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
+      int result = s.seekTo(k);
       if(result < 0) {
         // Passed KV is smaller than first KV in file, work from start of file
         return s.seekTo();
@@ -206,7 +205,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable,
KeyValue> {
     // Explode out directories that match the original FileInputFormat filters since HFiles
are written to directories where the
     // directory name is the column name
     for (FileStatus status : super.listStatus(job)) {
-      if (status.isDir()) {
+      if (status.isDirectory()) {
         FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
         for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
           result.add(match);

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
index bd3cc8f..2240b9c 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -137,7 +137,7 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements
ReadableSou
     }
     long sum = 0;
     for (FileStatus status : statuses) {
-      if (status.isDir()) {
+      if (status.isDirectory()) {
         sum += SourceTargetHelper.getPathSize(fs, status.getPath());
       } else {
         sum += status.getLen();

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
index a3dfc7d..c772515 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
@@ -20,9 +20,11 @@
 package org.apache.crunch.io.hbase;
 
 import org.apache.crunch.Pair;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
 import java.io.IOException;
@@ -30,16 +32,18 @@ import java.util.Arrays;
 import java.util.Iterator;
 
 class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>>
{
-  private final HTable table;
+  private final Table table;
   private final Scan[] scans;
+  private final Connection connection;
 
-  public HTableIterable(HTable table, Scan... scans) {
+  public HTableIterable(Connection connection, Table table, Scan... scans) {
     this.table = table;
+    this.connection = connection;
     this.scans = scans;
   }
 
   @Override
   public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
-      return new HTableIterator(table, Arrays.asList(scans));
+      return new HTableIterator(connection, table, Arrays.asList(scans));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
index 3db5897..ebef5d3 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
@@ -20,10 +20,11 @@
 package org.apache.crunch.io.hbase;
 
 import org.apache.crunch.Pair;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,13 +36,15 @@ import java.util.List;
 class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>>
{
   private static final Logger LOG = LoggerFactory.getLogger(HTableIterator.class);
 
-  private final HTable table;
+  private final Table table;
+  private final Connection connection;
   private final Iterator<Scan> scans;
   private ResultScanner scanner;
   private Iterator<Result> iter;
 
-  public HTableIterator(HTable table, List<Scan> scans) {
+  public HTableIterator(Connection connection, Table table, List<Scan> scans) {
     this.table = table;
+    this.connection = connection;
     this.scans = scans.iterator();
     try{
       this.scanner = table.getScanner(this.scans.next());
@@ -70,6 +73,11 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable,
Result>> {
         } catch (IOException e) {
           LOG.error("Exception closing HTable: {}", table.getName(), e);
         }
+        try {
+          connection.close();
+        } catch (IOException e) {
+          LOG.error("Exception closing HTable: {}", table.getName(), e);
+        }
       }
     }
     return hasNext;

http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
index 2c53ae1..78267cf 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.hbase;
 
 import org.apache.crunch.Target;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
 
 /**
  * Static factory methods for creating HBase {@link Target} types.
@@ -26,6 +27,10 @@ import org.apache.hadoop.fs.Path;
 public class ToHBase {
 
   public static Target table(String table) {
+    return table(TableName.valueOf(table));
+  }
+
+  public static Target table(TableName table) {
     return new HBaseTarget(table);
   }
 


Mime
View raw message