crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/2] git commit: CRUNCH-274: Add extra configuration arguments for ParallelDoOptions
Date Mon, 07 Oct 2013 06:47:36 GMT
Updated Branches:
  refs/heads/master 7a8af2865 -> d2a979ca6


CRUNCH-274: Add extra configuration arguments for ParallelDoOptions


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9b5e1080
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9b5e1080
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9b5e1080

Branch: refs/heads/master
Commit: 9b5e10808a1203fe873f38a9a9224d261e95b530
Parents: 7a8af28
Author: Josh Wills <jwills@apache.org>
Authored: Sun Oct 6 12:59:26 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Oct 6 15:44:22 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/ConfigurationIT.java | 70 ++++++++++++++++++++
 .../org/apache/crunch/ParallelDoOptions.java    | 40 +++++++++--
 .../impl/mr/collect/DoCollectionImpl.java       |  2 +-
 .../crunch/impl/mr/collect/DoTableImpl.java     |  2 +-
 .../crunch/impl/mr/collect/PCollectionImpl.java |  8 +--
 .../impl/mr/collect/PGroupedTableImpl.java      |  2 +-
 .../org/apache/crunch/impl/mr/plan/DoNode.java  | 22 +++---
 7 files changed, 124 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
new file mode 100644
index 0000000..0f65d8f
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
@@ -0,0 +1,70 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.crunch;
+
+import junit.framework.Assert;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ConfigurationIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private static final String KEY = "key";
+
+  private static DoFn<String, String> CONFIG_FN = new DoFn<String, String>()
{
+    private String value;
+
+    @Override
+    public void configure(Configuration conf) {
+      this.value = conf.get(KEY, "none");
+    }
+
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      emitter.emit(value);
+    }
+  };
+
+  @Test
+  public void testRun() throws Exception {
+    run(new MRPipeline(ConfigurationIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("set1.txt"), "testapalooza");
+  }
+
+  private static void run(Pipeline p, String input, String expected) throws Exception {
+    Iterable<String> mat = p.read(From.textFile(input))
+        .parallelDo("conf", CONFIG_FN, Writables.strings(), ParallelDoOptions.builder().conf(KEY,
expected).build())
+        .materialize();
+    for (String v : mat) {
+      if (!expected.equals(v)) {
+        Assert.fail("Unexpected value: " + v);
+      }
+    }
+    p.done();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
index b02fc9c..4c5411d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
+++ b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
@@ -19,9 +19,12 @@ package org.apache.crunch;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Container class that includes optional information about a {@code parallelDo} operation
@@ -31,24 +34,39 @@ import com.google.common.collect.Sets;
  */
 public class ParallelDoOptions {
   private final Set<SourceTarget<?>> sourceTargets;
-  
-  private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) {
+  private final Map<String, String> extraConf;
+
+  private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets, Map<String,
String> extraConf) {
     this.sourceTargets = sourceTargets;
+    this.extraConf = extraConf;
   }
   
   public Set<SourceTarget<?>> getSourceTargets() {
     return sourceTargets;
   }
-  
+
+  /**
+   * Applies the key-value pairs that were associated with this instance to the given {@code
Configuration}
+   * object. This is called just before the {@code configure} method on the {@code DoFn}
corresponding to this
+   * instance is called, so it is possible for the {@code DoFn} to see (and possibly override)
these settings.
+   */
+  public void configure(Configuration conf) {
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+  }
+
   public static Builder builder() {
     return new Builder();
   }
   
   public static class Builder {
     private Set<SourceTarget<?>> sourceTargets;
-    
+    private Map<String, String> extraConf;
+
     public Builder() {
       this.sourceTargets = Sets.newHashSet();
+      this.extraConf = Maps.newHashMap();
     }
     
     public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
@@ -61,8 +79,20 @@ public class ParallelDoOptions {
       return this;
     }
 
+    /**
+     * Specifies key-value pairs that should be added to the {@code Configuration} object
associated with the
+     * {@code Job} that includes these options.
+     * @param confKey The key
+     * @param confValue The value
+     * @return This {@code Builder} instance
+     */
+    public Builder conf(String confKey, String confValue) {
+      this.extraConf.put(confKey, confValue);
+      return this;
+    }
+
     public ParallelDoOptions build() {
-      return new ParallelDoOptions(sourceTargets);
+      return new ParallelDoOptions(sourceTargets, extraConf);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 917ef65..50afb75 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -66,7 +66,7 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S>
{
 
   @Override
   public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), fn, ntype);
+    return DoNode.createFnNode(getName(), fn, ntype, doOptions);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 5329c7a..28e2504 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -75,7 +75,7 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements
PTable<K, V>
 
   @Override
   public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), fn, type);
+    return DoNode.createFnNode(getName(), fn, type, doOptions);
   }
 
   public boolean hasCombineFn() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index b5f1cef..43711fc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -55,15 +55,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
   private final String name;
   protected MRPipeline pipeline;
   protected SourceTarget<S> materializedAt;
-  private final ParallelDoOptions options;
+  protected final ParallelDoOptions doOptions;
   
   public PCollectionImpl(String name) {
     this(name, ParallelDoOptions.builder().build());
   }
   
-  public PCollectionImpl(String name, ParallelDoOptions options) {
+  public PCollectionImpl(String name, ParallelDoOptions doOptions) {
     this.name = name;
-    this.options = options;
+    this.doOptions = doOptions;
   }
 
   @Override
@@ -234,7 +234,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
   }
   
   public Set<SourceTarget<?>> getTargetDependencies() {
-    Set<SourceTarget<?>> targetDeps = options.getSourceTargets();
+    Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets();
     for (PCollectionImpl<?> parent : getParents()) {
       targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index c385e16..e0e24ed 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -146,7 +146,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K,
Iterable<V>
 
   @Override
   public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype);
+    return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype, doOptions);
   }
 
   public DoNode getGroupingNode() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 87d0a5b..7c64ab4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.DoFn;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
@@ -42,16 +43,18 @@ public class DoNode {
   private final List<DoNode> children;
   private final Converter outputConverter;
   private final Source<?> source;
+  private final ParallelDoOptions options;
   private String outputName;
 
   private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children,
Converter outputConverter,
-      Source<?> source) {
+      Source<?> source, ParallelDoOptions options) {
     this.fn = fn;
     this.name = name;
     this.ptype = ptype;
     this.children = children;
     this.outputConverter = outputConverter;
     this.source = source;
+    this.options = options;
   }
 
   private static List<DoNode> allowsChildren() {
@@ -60,26 +63,22 @@ public class DoNode {
 
   public static <K, V> DoNode createGroupingNode(String name, PGroupedTableType<K,
V> ptype) {
     DoFn<?, ?> fn = ptype.getOutputMapFn();
-    return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
+    return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null, null);
   }
 
   public static DoNode createOutputNode(String name, Converter outputConverter, PType<?>
ptype) {
     DoFn<?, ?> fn = ptype.getOutputMapFn();
-    return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
+    return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null, null);
   }
 
-  public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?>
ptype) {
-    return new DoNode(function, name, ptype, allowsChildren(), null, null);
+  public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?>
ptype, ParallelDoOptions options) {
+    return new DoNode(function, name, ptype, allowsChildren(), null, null, options);
   }
 
   public static <S> DoNode createInputNode(Source<S> source) {
     PType<?> ptype = source.getType();
     DoFn<?, ?> fn = ptype.getInputMapFn();
-    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source);
-  }
-
-  public boolean isInputNode() {
-    return source != null;
+    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source, null);
   }
 
   public boolean isOutputNode() {
@@ -126,6 +125,9 @@ public class DoNode {
 
   public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext)
{
     List<RTNode> childRTNodes = Lists.newArrayList();
+    if (options != null) {
+      options.configure(conf);
+    }
     fn.configure(conf);
     for (DoNode child : children) {
       childRTNodes.add(child.toRTNode(false, conf, nodeContext));


Mime
View raw message