incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-41: Move HBase support into a sub-module
Date Mon, 13 Aug 2012 23:19:39 GMT
CRUNCH-41: Move HBase support into a sub-module

Move HBase code and test case to the new Maven module crunch-hbase.
Move HBase static factories to FromHBase, ToHBase, and AtHBase.
Remove unnecessary dependencies from crunch core.

Signed-off-by: jwills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a8691d0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a8691d0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a8691d0b

Branch: refs/heads/master
Commit: a8691d0bd193a5c514e8318d4aa927eee643e531
Parents: 7ab0da9
Author: Matthias Friedrich <matt@mafr.de>
Authored: Sun Aug 12 19:41:33 2012 +0200
Committer: jwills <jwills@apache.org>
Committed: Mon Aug 13 15:51:57 2012 -0700

----------------------------------------------------------------------
 crunch-hbase/pom.xml                               |  146 +++++++++
 .../apache/crunch/io/hbase/WordCountHBaseIT.java   |  229 +++++++++++++++
 .../org/apache/crunch/test/TemporaryPaths.java     |   40 +++
 .../java/org/apache/crunch/io/hbase/AtHBase.java   |   37 +++
 .../java/org/apache/crunch/io/hbase/FromHBase.java |   39 +++
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |  108 +++++++
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   93 ++++++
 .../java/org/apache/crunch/io/hbase/ToHBase.java   |   31 ++
 crunch/pom.xml                                     |   41 +--
 .../java/org/apache/crunch/WordCountHBaseIT.java   |  223 --------------
 crunch/src/main/java/org/apache/crunch/io/At.java  |   10 -
 .../src/main/java/org/apache/crunch/io/From.java   |   12 -
 crunch/src/main/java/org/apache/crunch/io/To.java  |    5 -
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |  108 -------
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   93 ------
 pom.xml                                            |   33 ++-
 16 files changed, 767 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
new file mode 100644
index 0000000..9ed06d0
--- /dev/null
+++ b/crunch-hbase/pom.xml
@@ -0,0 +1,146 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.3.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>crunch-hbase</artifactId>
+  <name>Apache Crunch HBase Support</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <excludeArtifactIds>crunch-test</excludeArtifactIds>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- We put slow-running tests into src/it and run them during the
+           integration-test phase using the failsafe plugin. This way
+           developers can run unit tests conveniently from the IDE or via
+           "mvn package" from the command line without triggering time
+           consuming integration tests. -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-test-source</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${basedir}/src/it/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-resource</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>add-test-resource</goal>
+            </goals>
+            <configuration>
+              <resources>
+                  <resource>
+                    <directory>${basedir}/src/it/resources</directory>
+                  </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
new file mode 100644
index 0000000..f13edeb
--- /dev/null
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.hbase.HBaseTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class WordCountHBaseIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private static final byte[] COUNTS_COLFAM = Bytes.toBytes("cf");
+  private static final byte[] WORD_COLFAM = Bytes.toBytes("cf");
+
+  private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility();
+
+  @SuppressWarnings("serial")
+  public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) {
+    PTable<String, Long> counts = Aggregate.count(words.parallelDo(
+        new DoFn<Pair<ImmutableBytesWritable, Result>, String>() {
+          @Override
+          public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) {
+            byte[] word = row.second().getValue(WORD_COLFAM, null);
+            if (word != null) {
+              emitter.emit(Bytes.toString(word));
+            }
+          }
+        }, words.getTypeFamily().strings()));
+
+    return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, Put>() {
+      @Override
+      public void process(Pair<String, Long> input, Emitter<Put> emitter) {
+        Put put = new Put(Bytes.toBytes(input.first()));
+        put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second()));
+        emitter.emit(put);
+      }
+
+    }, Writables.writables(Put.class));
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = hbaseTestUtil.getConfiguration();
+    conf.set("hadoop.log.dir", tmpDir.getFileName("logs"));
+    conf.set("hadoop.tmp.dir", tmpDir.getFileName("hadoop-tmp"));
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    conf.setInt("hbase.master.info.port", -1);
+    conf.setInt("hbase.regionserver.info.port", -1);
+    
+    // Workaround for HBASE-5711, we need to set config value dfs.datanode.data.dir.perm
+    // equal to the permissions of the temp dirs on the filesystem. These temp dirs were
+    // probably created using this process' umask. So we guess the temp dir permissions as
+    // 0777 & ~umask, and use that to set the config value.
+    try {
+      Process process = Runtime.getRuntime().exec("/bin/sh -c umask");
+      BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+      int rc = process.waitFor();
+      if(rc == 0) {
+        String umask = br.readLine();
+
+        int umaskBits = Integer.parseInt(umask, 8);
+        int permBits = 0777 & ~umaskBits;
+        String perms = Integer.toString(permBits, 8);
+
+        conf.set("dfs.datanode.data.dir.perm", perms);
+      }
+    } catch (Exception e) {
+      // ignore errors, we might not be running on POSIX, or "sh" might not be on the path
+    }
+
+    hbaseTestUtil.startMiniZKCluster();
+    hbaseTestUtil.startMiniCluster();
+    hbaseTestUtil.startMiniMapReduceCluster(1);
+
+    // For Hadoop-2.0.0, we have to do a bit more work.
+    if (TaskAttemptContext.class.isInterface()) {
+      conf = hbaseTestUtil.getConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      Path tmpPath = new Path("target", "WordCountHBaseTest-tmpDir");
+      FileSystem localFS = FileSystem.getLocal(conf);
+      for (FileStatus jarFile : localFS.listStatus(new Path("target/lib/"))) {
+        Path target = new Path(tmpPath, jarFile.getPath().getName());
+        fs.copyFromLocalFile(jarFile.getPath(), target);
+        DistributedCache.addFileToClassPath(target, conf, fs);
+      }
+
+      // Create a programmatic container for this jar.
+      JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseIT.jar"));
+      File baseDir = new File("target/test-classes");
+      String prefix = "org/apache/crunch/io/hbase/";
+      jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class");
+      jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class");
+      jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class");
+      jos.close();
+
+      Path target = new Path(tmpPath, "WordCountHBaseIT.jar");
+      fs.copyFromLocalFile(true, new Path("WordCountHBaseIT.jar"), target);
+      DistributedCache.addFileToClassPath(target, conf, fs);
+    }
+  }
+
+  private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException {
+    File file = new File(baseDir, classDir);
+    JarEntry e = new JarEntry(classDir);
+    e.setTime(file.lastModified());
+    jos.putNextEntry(e);
+    ByteStreams.copy(new FileInputStream(file), jos);
+    jos.closeEntry();
+  }
+
+  @Test
+  public void testWordCount() throws IOException {
+    run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hbaseTestUtil.shutdownMiniMapReduceCluster();
+    hbaseTestUtil.shutdownMiniCluster();
+    hbaseTestUtil.shutdownMiniZKCluster();
+  }
+
+  public void run(Pipeline pipeline) throws IOException {
+
+    Random rand = new Random();
+    int postFix = Math.abs(rand.nextInt());
+    String inputTableName = "crunch_words_" + postFix;
+    String outputTableName = "crunch_counts_" + postFix;
+
+    try {
+
+      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
+      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
+
+      int key = 0;
+      key = put(inputTable, key, "cat");
+      key = put(inputTable, key, "cat");
+      key = put(inputTable, key, "dog");
+      Scan scan = new Scan();
+      scan.addColumn(WORD_COLFAM, null);
+      HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
+      PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source);
+      pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName));
+      pipeline.done();
+
+      assertIsLong(outputTable, "cat", 2);
+      assertIsLong(outputTable, "dog", 1);
+    } finally {
+      // not quite sure...
+    }
+  }
+
+  protected int put(HTable table, int key, String value) throws IOException {
+    Put put = new Put(Bytes.toBytes(key));
+    put.add(WORD_COLFAM, null, Bytes.toBytes(value));
+    table.put(put);
+    return key + 1;
+  }
+
+  protected void assertIsLong(HTable table, String key, long i) throws IOException {
+    Get get = new Get(Bytes.toBytes(key));
+    get.addColumn(COUNTS_COLFAM, null);
+    Result result = table.get(get);
+
+    byte[] rawCount = result.getValue(COUNTS_COLFAM, null);
+    assertTrue(rawCount != null);
+    assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java b/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java
new file mode 100644
index 0000000..97cf0de
--- /dev/null
+++ b/crunch-hbase/src/it/java/org/apache/crunch/test/TemporaryPaths.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * Utilities for working with {@link TemporaryPath}.
+ */
+public final class TemporaryPaths {
+
+  /**
+   * Static factory returning a {@link TemporaryPath} with adjusted
+   * {@link Configuration} properties.
+   */
+  public static TemporaryPath create() {
+    return new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir");
+  }
+
+  private TemporaryPaths() {
+    // nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java
new file mode 100644
index 0000000..33ed036
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/AtHBase.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.hbase.client.Scan;
+
+
+/**
+ * Static factory methods for creating HBase {@link SourceTarget} types.
+ */
+public class AtHBase {
+
+  public static HBaseSourceTarget table(String table) {
+    return table(table, new Scan());
+  }
+
+  public static HBaseSourceTarget table(String table, Scan scan) {
+    return new HBaseSourceTarget(table, scan);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
new file mode 100644
index 0000000..221de9b
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Static factory methods for creating HBase {@link Source} types.
+ */
+public class FromHBase {
+
+  public static TableSource<ImmutableBytesWritable, Result> table(String table) {
+    return table(table, new Scan());
+  }
+
+  public static TableSource<ImmutableBytesWritable, Result> table(String table, Scan scan) {
+    return new HBaseSourceTarget(table, scan);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
new file mode 100644
index 0000000..fcb9de1
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.mapreduce.Job;
+
+public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
+    TableSource<ImmutableBytesWritable, Result> {
+
+  private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
+      Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
+
+  protected Scan scan;
+
+  public HBaseSourceTarget(String table, Scan scan) {
+    super(table);
+    this.scan = scan;
+  }
+
+  @Override
+  public PType<Pair<ImmutableBytesWritable, Result>> getType() {
+    return PTYPE;
+  }
+
+  @Override
+  public PTableType<ImmutableBytesWritable, Result> getTableType() {
+    return PTYPE;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof HBaseSourceTarget)) {
+      return false;
+    }
+    HBaseSourceTarget o = (HBaseSourceTarget) other;
+    // XXX scan does not have equals method
+    return table.equals(o.table) && scan.equals(o.scan);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(table).append(scan).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "HBaseTable(" + table + ")";
+  }
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapperClass(CrunchMapper.class);
+    HBaseConfiguration.addHbaseResources(conf);
+    conf.set(TableInputFormat.INPUT_TABLE, table);
+    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+    TableMapReduceUtil.addDependencyJars(job);
+  }
+
+  static String convertScanToString(Scan scan) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(out);
+    scan.write(dos);
+    return Base64.encodeBytes(out.toByteArray());
+  }
+
+  @Override
+  public long getSize(Configuration conf) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
new file mode 100644
index 0000000..050cff1
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+
+public class HBaseTarget implements MapReduceTarget {
+
+  protected String table;
+
+  public HBaseTarget(String table) {
+    this.table = table;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other)
+      return true;
+    if (other == null)
+      return false;
+    if (!other.getClass().equals(getClass()))
+      return false;
+    HBaseTarget o = (HBaseTarget) other;
+    return table.equals(o.table);
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(table).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "HBaseTable(" + table + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (Put.class.equals(ptype.getTypeClass())) {
+      handler.configure(this, ptype);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.addHbaseResources(conf);
+    job.setOutputFormatClass(TableOutputFormat.class);
+    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+    try {
+      TableMapReduceUtil.addDependencyJars(job);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
new file mode 100644
index 0000000..fa6b1a3
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.Target;
+
+/**
+ * Static factory methods for creating HBase {@link Target} types.
+ */
+public class ToHBase {
+
+  public static Target table(String table) {
+    return new HBaseTarget(table);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch/pom.xml b/crunch/pom.xml
index 3902880..df41df3 100644
--- a/crunch/pom.xml
+++ b/crunch/pom.xml
@@ -57,6 +57,11 @@ under the License.
     </dependency>
 
     <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-core-asl</artifactId>
     </dependency>
@@ -94,7 +99,13 @@ under the License.
       <artifactId>crunch-test</artifactId>
       <scope>test</scope>
     </dependency>
-           
+
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <scope>test</scope> <!-- only needed for LocalJobRunner -->
+    </dependency>
+
     <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
@@ -110,38 +121,10 @@ under the License.
       <artifactId>slf4j-log4j12</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 
   <build>
     <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>test-compile</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${project.build.directory}/lib</outputDirectory>
-              <excludeArtifactIds>crunch-test</excludeArtifactIds>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
       <!-- We put slow-running tests into src/it and run them during the
            integration-test phase using the failsafe plugin. This way
            developers can run unit tests conveniently from the IDE or via

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java b/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
deleted file mode 100644
index b96c125..0000000
--- a/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Random;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.hbase.HBaseSourceTarget;
-import org.apache.crunch.io.hbase.HBaseTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.io.ByteStreams;
-
-public class WordCountHBaseIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  private static final byte[] COUNTS_COLFAM = Bytes.toBytes("cf");
-  private static final byte[] WORD_COLFAM = Bytes.toBytes("cf");
-
-  private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility();
-
-  @SuppressWarnings("serial")
-  public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) {
-    PTable<String, Long> counts = Aggregate.count(words.parallelDo(
-        new DoFn<Pair<ImmutableBytesWritable, Result>, String>() {
-          @Override
-          public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) {
-            byte[] word = row.second().getValue(WORD_COLFAM, null);
-            if (word != null) {
-              emitter.emit(Bytes.toString(word));
-            }
-          }
-        }, words.getTypeFamily().strings()));
-
-    return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, Put>() {
-      @Override
-      public void process(Pair<String, Long> input, Emitter<Put> emitter) {
-        Put put = new Put(Bytes.toBytes(input.first()));
-        put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second()));
-        emitter.emit(put);
-      }
-
-    }, Writables.writables(Put.class));
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    Configuration conf = hbaseTestUtil.getConfiguration();
-    conf.set("hadoop.log.dir", tmpDir.getFileName("logs"));
-    conf.set("hadoop.tmp.dir", tmpDir.getFileName("hadoop-tmp"));
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
-    conf.setInt("hbase.master.info.port", -1);
-    conf.setInt("hbase.regionserver.info.port", -1);
-    
-    // Workaround for HBASE-5711, we need to set config value dfs.datanode.data.dir.perm
-    // equal to the permissions of the temp dirs on the filesystem. These temp dirs were
-    // probably created using this process' umask. So we guess the temp dir permissions as
-    // 0777 & ~umask, and use that to set the config value.
-    try {
-      Process process = Runtime.getRuntime().exec("/bin/sh -c umask");
-      BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
-      int rc = process.waitFor();
-      if(rc == 0) {
-        String umask = br.readLine();
-
-        int umaskBits = Integer.parseInt(umask, 8);
-        int permBits = 0777 & ~umaskBits;
-        String perms = Integer.toString(permBits, 8);
-
-        conf.set("dfs.datanode.data.dir.perm", perms);
-      }
-    } catch (Exception e) {
-      // ignore errors, we might not be running on POSIX, or "sh" might not be on the path
-    }
-
-    hbaseTestUtil.startMiniZKCluster();
-    hbaseTestUtil.startMiniCluster();
-    hbaseTestUtil.startMiniMapReduceCluster(1);
-
-    // For Hadoop-2.0.0, we have to do a bit more work.
-    if (TaskAttemptContext.class.isInterface()) {
-      conf = hbaseTestUtil.getConfiguration();
-      FileSystem fs = FileSystem.get(conf);
-      Path tmpPath = new Path("target", "WordCountHBaseTest-tmpDir");
-      FileSystem localFS = FileSystem.getLocal(conf);
-      for (FileStatus jarFile : localFS.listStatus(new Path("target/lib/"))) {
-        Path target = new Path(tmpPath, jarFile.getPath().getName());
-        fs.copyFromLocalFile(jarFile.getPath(), target);
-        DistributedCache.addFileToClassPath(target, conf, fs);
-      }
-
-      // Create a programmatic container for this jar.
-      JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseIT.jar"));
-      File baseDir = new File("target/test-classes");
-      String prefix = "org/apache/crunch/";
-      jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class");
-      jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class");
-      jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class");
-      jos.close();
-
-      Path target = new Path(tmpPath, "WordCountHBaseIT.jar");
-      fs.copyFromLocalFile(true, new Path("WordCountHBaseIT.jar"), target);
-      DistributedCache.addFileToClassPath(target, conf, fs);
-    }
-  }
-
-  private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException {
-    File file = new File(baseDir, classDir);
-    JarEntry e = new JarEntry(classDir);
-    e.setTime(file.lastModified());
-    jos.putNextEntry(e);
-    ByteStreams.copy(new FileInputStream(file), jos);
-    jos.closeEntry();
-  }
-
-  @Test
-  public void testWordCount() throws IOException {
-    run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()));
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    hbaseTestUtil.shutdownMiniMapReduceCluster();
-    hbaseTestUtil.shutdownMiniCluster();
-    hbaseTestUtil.shutdownMiniZKCluster();
-  }
-
-  public void run(Pipeline pipeline) throws IOException {
-
-    Random rand = new Random();
-    int postFix = Math.abs(rand.nextInt());
-    String inputTableName = "crunch_words_" + postFix;
-    String outputTableName = "crunch_counts_" + postFix;
-
-    try {
-
-      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
-      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
-
-      int key = 0;
-      key = put(inputTable, key, "cat");
-      key = put(inputTable, key, "cat");
-      key = put(inputTable, key, "dog");
-      Scan scan = new Scan();
-      scan.addColumn(WORD_COLFAM, null);
-      HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
-      PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source);
-      pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName));
-      pipeline.done();
-
-      assertIsLong(outputTable, "cat", 2);
-      assertIsLong(outputTable, "dog", 1);
-    } finally {
-      // not quite sure...
-    }
-  }
-
-  protected int put(HTable table, int key, String value) throws IOException {
-    Put put = new Put(Bytes.toBytes(key));
-    put.add(WORD_COLFAM, null, Bytes.toBytes(value));
-    table.put(put);
-    return key + 1;
-  }
-
-  protected void assertIsLong(HTable table, String key, long i) throws IOException {
-    Get get = new Get(Bytes.toBytes(key));
-    get.addColumn(COUNTS_COLFAM, null);
-    Result result = table.get(get);
-
-    byte[] rawCount = result.getValue(COUNTS_COLFAM, null);
-    assertTrue(rawCount != null);
-    assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java
index 2d787e3..951b740 100644
--- a/crunch/src/main/java/org/apache/crunch/io/At.java
+++ b/crunch/src/main/java/org/apache/crunch/io/At.java
@@ -19,7 +19,6 @@ package org.apache.crunch.io;
 
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.io.hbase.HBaseSourceTarget;
 import org.apache.crunch.io.seq.SeqFileSourceTarget;
 import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
 import org.apache.crunch.io.text.TextFileSourceTarget;
@@ -28,7 +27,6 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Scan;
 
 /**
  * Static factory methods for creating various {@link SourceTarget} types.
@@ -43,14 +41,6 @@ public class At {
     return new AvroFileSourceTarget<T>(path, avroType);
   }
 
-  public static HBaseSourceTarget hbaseTable(String table) {
-    return hbaseTable(table, new Scan());
-  }
-
-  public static HBaseSourceTarget hbaseTable(String table, Scan scan) {
-    return new HBaseSourceTarget(table, scan);
-  }
-
   public static <T> SeqFileSourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
     return sequenceFile(new Path(pathName), ptype);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java
index c7ae022..706be23 100644
--- a/crunch/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch/src/main/java/org/apache/crunch/io/From.java
@@ -20,7 +20,6 @@ package org.apache.crunch.io;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.io.avro.AvroFileSource;
-import org.apache.crunch.io.hbase.HBaseSourceTarget;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
 import org.apache.crunch.io.seq.SeqFileSource;
 import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
@@ -31,9 +30,6 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
@@ -61,14 +57,6 @@ public class From {
     return new AvroFileSource<T>(path, avroType);
   }
 
-  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table) {
-    return hbaseTable(table, new Scan());
-  }
-
-  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table, Scan scan) {
-    return new HBaseSourceTarget(table, scan);
-  }
-
   public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
     return sequenceFile(new Path(pathName), ptype);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
index 3190c64..faaa4d8 100644
--- a/crunch/src/main/java/org/apache/crunch/io/To.java
+++ b/crunch/src/main/java/org/apache/crunch/io/To.java
@@ -19,7 +19,6 @@ package org.apache.crunch.io;
 
 import org.apache.crunch.Target;
 import org.apache.crunch.io.avro.AvroFileTarget;
-import org.apache.crunch.io.hbase.HBaseTarget;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.io.seq.SeqFileTarget;
 import org.apache.crunch.io.text.TextFileTarget;
@@ -48,10 +47,6 @@ public class To {
     return new AvroFileTarget(path);
   }
 
-  public static Target hbaseTable(String table) {
-    return new HBaseTarget(table);
-  }
-
   public static Target sequenceFile(String pathName) {
     return sequenceFile(new Path(pathName));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
deleted file mode 100644
index fcb9de1..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.hbase;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.mapreduce.Job;
-
-public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
-    TableSource<ImmutableBytesWritable, Result> {
-
-  private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
-      Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
-
-  protected Scan scan;
-
-  public HBaseSourceTarget(String table, Scan scan) {
-    super(table);
-    this.scan = scan;
-  }
-
-  @Override
-  public PType<Pair<ImmutableBytesWritable, Result>> getType() {
-    return PTYPE;
-  }
-
-  @Override
-  public PTableType<ImmutableBytesWritable, Result> getTableType() {
-    return PTYPE;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof HBaseSourceTarget)) {
-      return false;
-    }
-    HBaseSourceTarget o = (HBaseSourceTarget) other;
-    // XXX scan does not have equals method
-    return table.equals(o.table) && scan.equals(o.scan);
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(table).append(scan).toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "HBaseTable(" + table + ")";
-  }
-
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-    Configuration conf = job.getConfiguration();
-    job.setInputFormatClass(TableInputFormat.class);
-    job.setMapperClass(CrunchMapper.class);
-    HBaseConfiguration.addHbaseResources(conf);
-    conf.set(TableInputFormat.INPUT_TABLE, table);
-    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
-    TableMapReduceUtil.addDependencyJars(job);
-  }
-
-  static String convertScanToString(Scan scan) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(out);
-    scan.write(dos);
-    return Base64.encodeBytes(out.toByteArray());
-  }
-
-  @Override
-  public long getSize(Configuration conf) {
-    // TODO something smarter here.
-    return 1000L * 1000L * 1000L;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
deleted file mode 100644
index 050cff1..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.hbase;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.io.MapReduceTarget;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-
-public class HBaseTarget implements MapReduceTarget {
-
-  protected String table;
-
-  public HBaseTarget(String table) {
-    this.table = table;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other)
-      return true;
-    if (other == null)
-      return false;
-    if (!other.getClass().equals(getClass()))
-      return false;
-    HBaseTarget o = (HBaseTarget) other;
-    return table.equals(o.table);
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(table).toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "HBaseTable(" + table + ")";
-  }
-
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if (Put.class.equals(ptype.getTypeClass())) {
-      handler.configure(this, ptype);
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    Configuration conf = job.getConfiguration();
-    HBaseConfiguration.addHbaseResources(conf);
-    job.setOutputFormatClass(TableOutputFormat.class);
-    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
-    try {
-      TableMapReduceUtil.addDependencyJars(job);
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a8691d0b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0e44105..6fcb21f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@ under the License.
 
   <modules>
     <module>crunch</module>
+    <module>crunch-hbase</module>
     <module>crunch-test</module>
     <module>crunch-examples</module>
     <module>crunch-scrunch</module>
@@ -105,6 +106,12 @@ under the License.
 
       <dependency>
         <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-hbase</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.crunch</groupId>
         <artifactId>crunch-test</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -134,6 +141,24 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>commons-codec</groupId>
+        <artifactId>commons-codec</artifactId>
+        <version>1.4</version>
+      </dependency>
+
+      <dependency>
+        <groupId>commons-lang</groupId>
+        <artifactId>commons-lang</artifactId>
+        <version>2.4</version>
+      </dependency>
+
+      <dependency>
+        <groupId>commons-httpclient</groupId>
+        <artifactId>commons-httpclient</artifactId>
+        <version>3.0.1</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.8.3</version>
@@ -207,8 +232,14 @@ under the License.
 
       <dependency>
         <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-api</artifactId>
+        <version>1.4.3</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
-        <version>1.6.1</version>
+        <version>1.4.3</version>
       </dependency>
 
       <dependency>


Mime
View raw message