Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 89650104EC for ; Wed, 28 May 2014 20:37:01 +0000 (UTC) Received: (qmail 84643 invoked by uid 500); 28 May 2014 20:37:01 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 84606 invoked by uid 500); 28 May 2014 20:37:01 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 84599 invoked by uid 99); 28 May 2014 20:37:01 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 May 2014 20:37:01 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3A6314E19A; Wed, 28 May 2014 20:37:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mkwhit@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-387: Added support for the 0.8 code stream to support multiple HBase Scans Date: Wed, 28 May 2014 20:37:01 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 dbec907af -> 8fe96d6cc CRUNCH-387: Added support for the 0.8 code stream to support multiple HBase Scans Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8fe96d6c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8fe96d6c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8fe96d6c Branch: refs/heads/apache-crunch-0.8 Commit: 8fe96d6ccb7bb53a5fa0c3f011f3e6e6e8133838 Parents: dbec907 Author: Micah Whitacre Authored: Wed May 28 10:57:31 2014 -0500 Committer: Micah Whitacre Committed: Wed May 28 12:09:06 2014 -0500 ---------------------------------------------------------------------- .../crunch/io/hbase/WordCountHBaseIT.java | 13 +++++- .../org/apache/crunch/io/hbase/HBaseData.java | 17 +++++--- .../crunch/io/hbase/HBaseSourceTarget.java | 43 +++++++++++++------- .../apache/crunch/io/hbase/HTableIterable.java | 13 +++--- .../apache/crunch/io/hbase/HTableIterator.java | 35 ++++++++++++---- pom.xml | 2 +- 6 files changed, 85 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index af32c1a..2bbb70b 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -69,6 +69,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; +import javax.ws.rs.HEAD; + public class WordCountHBaseIT { static class StringifyFn extends MapFn>, String> { @@ -237,9 +239,18 @@ public class WordCountHBaseIT { key = put(inputTable, key, "cat"); key = put(inputTable, key, "cat"); key = put(inputTable, key, "dog"); + inputTable.flushCommits(); + + //Setup scan using multiple scans that simply cut the rows in half. Scan scan = new Scan(); scan.addFamily(WORD_COLFAM); - HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); + byte[] cutoffPoint = Bytes.toBytes(2); + scan.setStopRow(cutoffPoint); + Scan scan2 = new Scan(); + scan.addFamily(WORD_COLFAM); + scan2.setStartRow(cutoffPoint); + + HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2); PTable words = pipeline.read(source); Map materialized = words.materializeToMap(); http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java index 84c39db..84de288 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Set; @@ -35,12 +36,12 @@ import java.util.Set; public class HBaseData implements ReadableData> { private final String table; - private final String scanAsString; + private final String scansAsString; private transient SourceTarget parent; - public HBaseData(String table, String scanAsString, SourceTarget parent) { + public HBaseData(String table, String scansAsString, SourceTarget parent) { this.table = table; - this.scanAsString = scanAsString; + this.scansAsString = scansAsString; this.parent = parent; } @@ -63,7 +64,13 @@ public class HBaseData implements ReadableData ctxt) throws IOException { Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration()); HTable htable = new HTable(hconf, table); - Scan scan = HBaseSourceTarget.convertStringToScan(scanAsString); - return new HTableIterable(htable, scan); + + String[] scanStrings = StringUtils.getStrings(scansAsString); + Scan[] scans = new Scan[scanStrings.length]; + for(int i = 0; i < scanStrings.length; i++){ + scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]); + } + + return new HTableIterable(htable, scans); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index c1d7eb7..6ed3b42 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; public class HBaseSourceTarget extends HBaseTarget implements ReadableSourceTarget>, @@ -60,16 +62,31 @@ public class HBaseSourceTarget extends HBaseTarget implements private static final PTableType PTYPE = Writables.tableOf( Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class)); - protected Scan scan; - private FormatBundle inputBundle; + protected Scan[] scans; + protected String scansAsString; + private FormatBundle inputBundle; - public HBaseSourceTarget(String table, Scan scan) { + public HBaseSourceTarget(String table, Scan... scans) { super(table); - this.scan = scan; + this.scans = scans; + try { - this.inputBundle = FormatBundle.forInput(TableInputFormat.class) - .set(TableInputFormat.INPUT_TABLE, table) - .set(TableInputFormat.SCAN, convertScanToString(scan)); + + byte[] tableName = Bytes.toBytes(table); + //Copy scans and enforce that they are for the table specified + Scan[] tableScans = new Scan[scans.length]; + String[] scanStrings = new String[scans.length]; + for(int i = 0; i < scans.length; i++){ + tableScans[i] = new Scan(scans[i]); + //enforce Scan is for same table + tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName); + //Convert the Scan into a String + scanStrings[i] = convertScanToString(tableScans[i]); + } + this.scans = tableScans; + this.scansAsString = StringUtils.arrayToString(scanStrings); + this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class) + .set(MultiTableInputFormat.SCANS, scansAsString); } catch (IOException e) { throw new RuntimeException(e); } @@ -103,7 +120,7 @@ public class HBaseSourceTarget extends HBaseTarget implements @Override public int hashCode() { - return new HashCodeBuilder().append(table).append(scan).toHashCode(); + return new HashCodeBuilder().append(table).append(scansAsString).toHashCode(); } @Override @@ -161,16 +178,12 @@ public class HBaseSourceTarget extends HBaseTarget implements public Iterable> read(Configuration conf) throws IOException { Configuration hconf = HBaseConfiguration.create(conf); HTable htable = new HTable(hconf, table); - return new HTableIterable(htable, scan); + return new HTableIterable(htable, scans); } @Override public ReadableData> asReadable() { - try { - return new HBaseData(table, convertScanToString(scan), this); - } catch (IOException e) { - throw new RuntimeException(e); - } + return new HBaseData(table, scansAsString, this); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java index c58732c..a3dfc7d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java @@ -26,23 +26,20 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; class HTableIterable implements Iterable> { private final HTable table; - private final Scan scan; + private final Scan[] scans; - public HTableIterable(HTable table, Scan scan) { + public HTableIterable(HTable table, Scan... scans) { this.table = table; - this.scan = scan; + this.scans = scans; } @Override public Iterator> iterator() { - try { - return new HTableIterator(table, table.getScanner(scan)); - } catch (IOException e) { - throw new RuntimeException(e); - } + return new HTableIterator(table, Arrays.asList(scans)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java index daa4a48..d679b72 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java @@ -25,21 +25,30 @@ import org.apache.crunch.Pair; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.Iterator; +import java.util.List; class HTableIterator implements Iterator> { private static final Log LOG = LogFactory.getLog(HTableIterator.class); private final HTable table; - private final ResultScanner scanner; - private final Iterator iter; + private final Iterator scans; + private ResultScanner scanner; + private Iterator iter; - public HTableIterator(HTable table, ResultScanner scanner) { + public HTableIterator(HTable table, List scans) { this.table = table; - this.scanner = scanner; + this.scans = scans.iterator(); + try{ + this.scanner = table.getScanner(this.scans.next()); + }catch(IOException ioe){ + throw new RuntimeException(ioe); + } this.iter = scanner.iterator(); } @@ -48,10 +57,20 @@ class HTableIterator implements Iterator> { boolean hasNext = iter.hasNext(); if (!hasNext) { scanner.close(); - try { - table.close(); - } catch (IOException e) { - LOG.error("Exception closing HTable: " + table.getTableName(), e); + hasNext = scans.hasNext(); + if(hasNext){ + try{ + scanner = table.getScanner(this.scans.next()); + iter = scanner.iterator(); + } catch(IOException ioe){ + throw new RuntimeException("Unable to create next scanner from "+ Bytes.toString(table.getTableName()), ioe); + } + } else { + try { + table.close(); + } catch (IOException e) { + LOG.error("Exception closing HTable: " + Bytes.toString(table.getTableName()), e); + } } } return hasNext; http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4f82457..224d5a6 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,7 @@ under the License. 1.9.0 org.apache.crunch 1.1.2 - 0.94.3 + 0.94.15 2.10 2.10.4