crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/2] CRUNCH-449: Add a new PipelineCallable and sequentialDo method for inserting non-Crunch tasks into Crunch workflows.
Date Sat, 02 Aug 2014 16:22:44 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 31d80f5d5 -> 21965a6e2


http://git-wip-us.apache.org/repos/asf/crunch/blob/21965a6e/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
index 0d1d5e0..237c8de 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
@@ -17,8 +17,8 @@
  */
 package org.apache.crunch.impl.spark.collect;
 
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Source;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.BaseInputCollection;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
@@ -26,7 +26,6 @@ import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
 import org.apache.crunch.impl.spark.fn.InputConverterFunction;
 import org.apache.crunch.impl.spark.fn.MapFunction;
-import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -36,8 +35,8 @@ import java.io.IOException;
 
 public class InputCollection<S> extends BaseInputCollection<S> implements SparkCollection
{
 
-  InputCollection(Source<S> source, DistributedPipeline pipeline) {
-    super(source, pipeline);
+  InputCollection(Source<S> source, DistributedPipeline pipeline, ParallelDoOptions
doOpts) {
+    super(source, pipeline, doOpts);
   }
 
   public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/21965a6e/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
index 7e8471c..606ffce 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.impl.spark.collect;
 
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.BaseInputTable;
@@ -30,8 +31,8 @@ import java.io.IOException;
 
 public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkCollection
{
 
-  public InputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
-    super(source, pipeline);
+  public InputTable(TableSource<K, V> source, DistributedPipeline pipeline, ParallelDoOptions
doOpts) {
+    super(source, pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/21965a6e/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
index 389d91c..1421945 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
@@ -43,13 +43,19 @@ import java.util.List;
 public class SparkCollectFactory implements PCollectionFactory {
 
   @Override
-  public <S> BaseInputCollection<S> createInputCollection(Source<S> source,
DistributedPipeline pipeline) {
-    return new InputCollection<S>(source, pipeline);
+  public <S> BaseInputCollection<S> createInputCollection(
+      Source<S> source,
+      DistributedPipeline pipeline,
+      ParallelDoOptions doOpts) {
+    return new InputCollection<S>(source, pipeline, doOpts);
   }
 
   @Override
-  public <K, V> BaseInputTable<K, V> createInputTable(TableSource<K, V>
source, DistributedPipeline pipeline) {
-    return new InputTable<K, V>(source, pipeline);
+  public <K, V> BaseInputTable<K, V> createInputTable(
+      TableSource<K, V> source,
+      DistributedPipeline pipeline,
+      ParallelDoOptions doOpts) {
+    return new InputTable<K, V>(source, pipeline, doOpts);
   }
 
   @Override


Mime
View raw message