crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-275: Support extra config args on Source, Target, and SourceTarget
Date Mon, 07 Oct 2013 06:47:37 GMT
CRUNCH-275: Support extra config args on Source, Target, and SourceTarget


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d2a979ca
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d2a979ca
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d2a979ca

Branch: refs/heads/master
Commit: d2a979ca6a3cb95e2394f2ba901ca1874ffc49fa
Parents: 9b5e108
Author: Josh Wills <jwills@apache.org>
Authored: Sun Oct 6 15:29:44 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Oct 6 23:30:28 2013 -0700

----------------------------------------------------------------------
 .../crunch/contrib/io/jdbc/DataBaseSource.java  | 98 +++++++++-----------
 .../src/main/java/org/apache/crunch/Source.java |  8 ++
 .../java/org/apache/crunch/SourceTarget.java    |  6 ++
 .../src/main/java/org/apache/crunch/Target.java |  7 ++
 .../java/org/apache/crunch/io/FormatBundle.java |  4 +-
 .../apache/crunch/io/avro/AvroFileTarget.java   | 14 +--
 .../crunch/io/avro/trevni/TrevniKeyTarget.java  |  2 +-
 .../apache/crunch/io/impl/FileSourceImpl.java   |  8 +-
 .../apache/crunch/io/impl/FileTargetImpl.java   | 45 +++++++--
 .../apache/crunch/io/impl/SourceTargetImpl.java | 19 ++++
 .../apache/crunch/io/text/TextFileTarget.java   |  3 +-
 .../crunch/io/hbase/HBaseSourceTarget.java      | 15 +++
 .../org/apache/crunch/io/hbase/HBaseTarget.java | 16 ++++
 .../org/apache/crunch/io/hbase/HFileTarget.java | 31 +------
 14 files changed, 172 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 337ecb7..2c51b84 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -17,19 +17,14 @@
  */
 package org.apache.crunch.contrib.io.jdbc;
 
-import java.io.IOException;
 import java.sql.Driver;
 
-import org.apache.crunch.Source;
-import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
+import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -44,78 +39,83 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  * 
  * @param <T> The input type of this source
  */
-public class DataBaseSource<T extends DBWritable & Writable> implements Source<T>
{
-
-  private Class<T> inputClass;
-  private PType<T> ptype;
-  private String driverClass;
-  private String url;
-  private String username;
-  private String password;
-  private String selectClause;
-  public String countClause;
-
-  private DataBaseSource(Class<T> inputClass) {
-    this.inputClass = inputClass;
-    this.ptype = Writables.writables(inputClass);
+public class DataBaseSource<T extends DBWritable & Writable> extends FileSourceImpl<T>
{
+
+  private DataBaseSource(Class<T> inputClass,
+      String driverClassName,
+      String url,
+      String username,
+      String password,
+      String selectClause,
+      String countClause) {
+    super(
+        new Path("dbsource"),
+        Writables.writables(inputClass),
+        FormatBundle.forInput(DBInputFormat.class)
+            .set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClassName)
+            .set(DBConfiguration.URL_PROPERTY, url)
+            .set(DBConfiguration.USERNAME_PROPERTY, username)
+            .set(DBConfiguration.PASSWORD_PROPERTY, password)
+            .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName())
+            .set(DBConfiguration.INPUT_QUERY, selectClause)
+            .set(DBConfiguration.INPUT_COUNT_QUERY, countClause));
   }
 
   static class Builder<T extends DBWritable & Writable> {
 
+    private Class<T> inputClass;
+    private String driverClass;
+    private String url;
+    private String username;
+    private String password;
+    private String selectClause;
+    public String countClause;
+
     private DataBaseSource<T> dataBaseSource;
 
     public Builder(Class<T> inputClass) {
-      this.dataBaseSource = new DataBaseSource<T>(inputClass);
+      this.inputClass = inputClass;
     }
 
     Builder<T> setDriverClass(Class<? extends Driver> driverClass) {
-      dataBaseSource.driverClass = driverClass.getName();
+      this.driverClass = driverClass.getName();
       return this;
     }
 
     Builder<T> setUrl(String url) {
-      dataBaseSource.url = url;
+      this.url = url;
       return this;
     }
 
     Builder<T> setUsername(String username) {
-      dataBaseSource.username = username;
+      this.username = username;
       return this;
     }
 
     Builder<T> setPassword(String password) {
-      dataBaseSource.password = password;
+      this.password = password;
       return this;
     }
 
     Builder<T> selectSQLQuery(String selectClause) {
-      dataBaseSource.selectClause = selectClause;
+      this.selectClause = selectClause;
       return this;
     }
 
     Builder<T> countSQLQuery(String countClause) {
-      dataBaseSource.countClause = countClause;
+      this.countClause = countClause;
       return this;
     }
 
     DataBaseSource<T> build() {
-      return dataBaseSource;
-    }
-  }
-
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-    Configuration configuration = job.getConfiguration();
-    DBConfiguration.configureDB(configuration, driverClass, url, username, password);
-    if (inputId == -1) {
-      job.setInputFormatClass(DBInputFormat.class);
-      DBInputFormat.setInput(job, inputClass, selectClause, countClause);
-    } else {
-      FormatBundle<DBInputFormat> bundle = FormatBundle.forInput(DBInputFormat.class)
-          .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName())
-          .set(DBConfiguration.INPUT_QUERY, selectClause)
-          .set(DBConfiguration.INPUT_COUNT_QUERY, countClause);
-      CrunchInputs.addInputPath(job, new Path("dbsource"), bundle, inputId);
+      return new DataBaseSource<T>(
+          inputClass,
+          driverClass,
+          url,
+          username,
+          password,
+          selectClause,
+          countClause);
     }
   }
 
@@ -129,14 +129,4 @@ public class DataBaseSource<T extends DBWritable & Writable>
implements Source<T
   public long getLastModifiedAt(Configuration configuration) {
     return -1;
   }
-
-  @Override
-  public PType<T> getType() {
-    return ptype;
-  }
-
-  @Override
-  public Converter<?, ?, ?, ?> getConverter() {
-    return ptype.getConverter();
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b0a0449..b209dfc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -30,6 +30,14 @@ import org.apache.hadoop.mapreduce.Job;
  * 
  */
 public interface Source<T> {
+
+  /**
+   * Adds the given key-value pair to the {@code Configuration} instance that is used to
read
+   * this {@code Source<T></T>}. Allows for multiple inputs to re-use the same
config keys with
+   * different values when necessary.
+   */
+  Source<T> inputConf(String key, String value);
+
   /**
    * Returns the {@code PType} for this source.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
index 09c03c6..80cd730 100644
--- a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
@@ -23,4 +23,10 @@ package org.apache.crunch;
  *
  */
 public interface SourceTarget<T> extends Source<T>, Target {
+  /**
+   * Adds the given key-value pair to the {@code Configuration} instance(s) that are used
to
+   * read and write this {@code SourceTarget<T>}. Allows for multiple inputs and outputs
to
+   * re-use the same config keys with different values when necessary.
+   */
+  SourceTarget<T> conf(String key, String value);
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 65ad67d..112c637 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -61,6 +61,13 @@ public interface Target {
   }
 
   /**
+   * Adds the given key-value pair to the {@code Configuration} instance that is used to
write
+   * this {@code Target}. Allows for multiple target outputs to re-use the same config keys
with
+   * different values when necessary.
+   */
+  Target outputConf(String key, String value);
+
+  /**
    * Apply the given {@code WriteMode} to this {@code Target} instance.
    * 
    * @param writeMode The strategy for handling existing outputs

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index 4796006..aa84fee 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -70,8 +70,8 @@ public class FormatBundle<K> implements Serializable, Writable, Configurable
{
     return new FormatBundle<T>(inputFormatClass);
   }
   
-  public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T>
inputFormatClass) {
-    return new FormatBundle<T>(inputFormatClass);
+  public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T>
outputFormatClass) {
+    return new FormatBundle<T>(outputFormatClass);
   }
   
   public FormatBundle() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index ea0179f..fc82361 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.avro;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
@@ -63,21 +64,16 @@ public class AvroFileTarget extends FileTargetImpl {
   @Override
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
     AvroType<?> atype = (AvroType<?>) ptype;
-    Configuration conf = job.getConfiguration();
+    FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class);
     String schemaParam = null;
     if (name == null) {
       schemaParam = "avro.output.schema";
     } else {
       schemaParam = "avro.output.schema." + name;
     }
-    String outputSchema = conf.get(schemaParam);
-    if (outputSchema == null) {
-      conf.set(schemaParam, atype.getSchema().toString());
-    } else if (!outputSchema.equals(atype.getSchema().toString())) {
-      throw new IllegalStateException("Avro targets must use the same output schema");
-    }
-    Avros.configureReflectDataFactory(conf);
-    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
+    bundle.set(schemaParam, atype.getSchema().toString());
+    Avros.configureReflectDataFactory(job.getConfiguration());
+    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle,
         outputPath, name);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
index e1f2ab1..e7acc08 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
@@ -83,7 +83,7 @@ public class TrevniKeyTarget extends FileTargetImpl {
       AvroJob.setMapOutputKeySchema(job, atype.getSchema());
 
       Avros.configureReflectDataFactory(conf);
-      configureForMapReduce(job, AvroKey.class, NullWritable.class, TrevniOutputFormat.class,
+      configureForMapReduce(job, AvroKey.class, NullWritable.class, FormatBundle.forOutput(TrevniOutputFormat.class),
           outputPath, name);
     } else {
       FormatBundle<TrevniOutputFormat> bundle = FormatBundle.forOutput(

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index a3cbdc8..766b9b0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -84,7 +84,13 @@ public class FileSourceImpl<T> implements Source<T> {
   public List<Path> getPaths() {
     return paths;
   }
-  
+
+  @Override
+  public Source<T> inputConf(String key, String value) {
+    inputBundle.set(key, value);
+    return this;
+  }
+
   @Override
   public Converter<?, ?, ?, ?> getConverter() {
     return ptype.getConverter();

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index cbd87e3..8ae2589 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,17 +18,21 @@
 package org.apache.crunch.io.impl;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.SourceTargetHelper;
@@ -46,14 +50,30 @@ public class FileTargetImpl implements PathTarget {
   private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
   
   protected final Path path;
-  private final Class<? extends FileOutputFormat> outputFormatClass;
+  private final FormatBundle<? extends FileOutputFormat> formatBundle;
   private final FileNamingScheme fileNamingScheme;
 
   public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
-      FileNamingScheme fileNamingScheme) {
+                        FileNamingScheme fileNamingScheme) {
+    this(path, outputFormatClass, fileNamingScheme, ImmutableMap.<String, String>of());
+  }
+
+  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
+      FileNamingScheme fileNamingScheme, Map<String, String> extraConf) {
     this.path = path;
-    this.outputFormatClass = outputFormatClass;
+    this.formatBundle = FormatBundle.forOutput(outputFormatClass);
     this.fileNamingScheme = fileNamingScheme;
+    if (extraConf != null && !extraConf.isEmpty()) {
+      for (Map.Entry<String, String> e : extraConf.entrySet()) {
+        formatBundle.set(e.getKey(), e.getValue());
+      }
+    }
+  }
+
+  @Override
+  public Target outputConf(String key, String value) {
+    formatBundle.set(key, value);
+    return this;
   }
 
   @Override
@@ -61,22 +81,29 @@ public class FileTargetImpl implements PathTarget {
     Converter converter = ptype.getConverter();
     Class keyClass = converter.getKeyClass();
     Class valueClass = converter.getValueClass();
-    configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
+    configureForMapReduce(job, keyClass, valueClass, formatBundle, outputPath, name);
   }
 
+  @Deprecated
   protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
       Class outputFormatClass, Path outputPath, String name) {
+    configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(outputFormatClass),
outputPath, name);
+  }
+
+  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
+      FormatBundle formatBundle, Path outputPath, String name) {
     try {
       FileOutputFormat.setOutputPath(job, outputPath);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     if (name == null) {
-      job.setOutputFormatClass(outputFormatClass);
+      job.setOutputFormatClass(formatBundle.getFormatClass());
+      formatBundle.configure(job.getConfiguration());
       job.setOutputKeyClass(keyClass);
       job.setOutputValueClass(valueClass);
     } else {
-      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+      CrunchOutputs.addNamedOutput(job, name, formatBundle, keyClass, valueClass);
     }
   }
 
@@ -185,7 +212,11 @@ public class FileTargetImpl implements PathTarget {
 
   @Override
   public String toString() {
-    return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
+    return new StringBuilder()
+        .append(formatBundle.getFormatClass().getSimpleName())
+        .append("(")
+        .append(path)
+        .append(")")
         .toString();
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 68c9430..b15a00b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -40,6 +40,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
   }
 
   @Override
+  public Source<T> inputConf(String key, String value) {
+    source.inputConf(key, value);
+    return this;
+  }
+
+  @Override
   public PType<T> getType() {
     return source.getType();
   }
@@ -87,6 +93,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
   }
 
   @Override
+  public Target outputConf(String key, String value) {
+    target.outputConf(key, value);
+    return this;
+  }
+
+  @Override
   public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf)
{
     return target.handleExisting(strategy, lastModifiedAt, conf);  
   }
@@ -105,4 +117,11 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
   public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
     return target.getConverter(ptype);
   }
+
+  @Override
+  public SourceTarget<T> conf(String key, String value) {
+    source.inputConf(key, value);
+    target.outputConf(key, value);
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index 17ae7a6..4b9197b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.text;
 import org.apache.avro.Schema;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.Converter;
@@ -72,7 +73,7 @@ public class TextFileTarget extends FileTargetImpl {
     Converter converter = ptype.getConverter();
     Class keyClass = converter.getKeyClass();
     Class valueClass = converter.getValueClass();
-    configureForMapReduce(job, keyClass, valueClass, getOutputFormat(ptype), outputPath,
name);
+    configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(getOutputFormat(ptype)),
outputPath, name);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index c003e48..1b2a03e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Pair;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
 import org.apache.crunch.io.CrunchInputs;
@@ -73,6 +75,12 @@ public class HBaseSourceTarget extends HBaseTarget implements
   }
 
   @Override
+  public Source<Pair<ImmutableBytesWritable, Result>> inputConf(String key, String
value) {
+    inputBundle.set(key, value);
+    return this;
+  }
+
+  @Override
   public PType<Pair<ImmutableBytesWritable, Result>> getType() {
     return PTYPE;
   }
@@ -146,6 +154,13 @@ public class HBaseSourceTarget extends HBaseTarget implements
     return new HTableIterable(htable, scan);
   }
 
+  @Override
+  public SourceTarget<Pair<ImmutableBytesWritable, Result>> conf(String key,
String value) {
+    inputConf(key, value);
+    outputConf(key, value);
+    return this;
+  }
+
   private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable,
Result>> {
     private final HTable table;
     private final Scan scan;

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 69a260e..2c3c239 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -18,12 +18,15 @@
 package org.apache.crunch.io.hbase;
 
 import java.io.IOException;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.MapReduceTarget;
@@ -45,6 +48,7 @@ public class HBaseTarget implements MapReduceTarget {
   private static final Log LOG = LogFactory.getLog(HBaseTarget.class);
   
   protected String table;
+  private Map<String, String> extraConf = Maps.newHashMap();
 
   public HBaseTarget(String table) {
     this.table = table;
@@ -100,10 +104,16 @@ public class HBaseTarget implements MapReduceTarget {
       job.setOutputKeyClass(ImmutableBytesWritable.class);
       job.setOutputValueClass(typeClass);
       conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+      for (Map.Entry<String, String> e : extraConf.entrySet()) {
+        conf.set(e.getKey(), e.getValue());
+      }
     } else {
       FormatBundle<TableOutputFormat> bundle = FormatBundle.forOutput(
           TableOutputFormat.class);
       bundle.set(TableOutputFormat.OUTPUT_TABLE, table);
+      for (Map.Entry<String, String> e : extraConf.entrySet()) {
+        bundle.set(e.getKey(), e.getValue());
+      }
       CrunchOutputs.addNamedOutput(job, name,
           bundle,
           ImmutableBytesWritable.class,
@@ -117,6 +127,12 @@ public class HBaseTarget implements MapReduceTarget {
   }
 
   @Override
+  public Target outputConf(String key, String value) {
+    extraConf.put(key, value);
+    return this;
+  }
+
+  @Override
   public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf)
{
     LOG.info("HBaseTarget ignores checks for existing outputs...");
     return false;

http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 1cef4fa..0a78bd8 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 public class HFileTarget extends FileTargetImpl {
 
   private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor();
-  private final HColumnDescriptor hcol;
 
   public HFileTarget(String path) {
     this(new Path(path));
@@ -45,34 +44,8 @@ public class HFileTarget extends FileTargetImpl {
 
   public HFileTarget(Path path, HColumnDescriptor hcol) {
     super(path, HFileOutputFormatForCrunch.class, SequentialFileNamingScheme.getInstance());
-    this.hcol = Preconditions.checkNotNull(hcol);
-  }
-
-  @Override
-  protected void configureForMapReduce(
-      Job job,
-      Class keyClass,
-      Class valueClass,
-      Class outputFormatClass,
-      Path outputPath,
-      String name) {
-    try {
-      FileOutputFormat.setOutputPath(job, outputPath);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    String hcolStr = Hex.encodeHexString(WritableUtils.toByteArray(hcol));
-    if (name == null) {
-      job.setOutputFormatClass(HFileOutputFormatForCrunch.class);
-      job.setOutputKeyClass(keyClass);
-      job.setOutputValueClass(valueClass);
-      job.getConfiguration().set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
-    } else {
-      FormatBundle<HFileOutputFormatForCrunch> bundle = FormatBundle.forOutput(HFileOutputFormatForCrunch.class);
-      bundle.set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
-      CrunchOutputs.addNamedOutput(job, name, bundle, keyClass, valueClass);
-    }
+    Preconditions.checkNotNull(hcol);
+    outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, Hex.encodeHexString(WritableUtils.toByteArray(hcol)));
   }
 
   @Override


Mime
View raw message