beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [08/50] [abbrv] incubator-beam git commit: More cleanup. View.AsSingleton is already exercised by the TfIdf test.
Date Thu, 10 Mar 2016 20:58:33 GMT
More cleanup. View.AsSingleton is already exercised by the TfIdf test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78d66145
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78d66145
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78d66145

Branch: refs/heads/master
Commit: 78d66145ed37ef82bb8cc8f1f7a4783d018cceae
Parents: 89e2bb5
Author: Tom White <tom@cloudera.com>
Authored: Tue Jun 30 09:00:04 2015 +0100
Committer: Tom White <tom@cloudera.com>
Committed: Thu Mar 10 11:15:14 2016 +0000

----------------------------------------------------------------------
 .../main/java/com/cloudera/dataflow/spark/EvaluationContext.java | 3 ++-
 .../java/com/cloudera/dataflow/spark/SparkContextFactory.java    | 2 +-
 .../java/com/cloudera/dataflow/spark/SparkProcessContext.java    | 4 +---
 .../java/com/cloudera/dataflow/spark/TransformTranslator.java    | 2 --
 4 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
index df3f7f7..5337264 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
@@ -193,8 +193,9 @@ public class EvaluationContext implements EvaluationResult {
     SparkContextFactory.stopSparkContext(jsc);
   }
 
+  /** The runner is blocking. */
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    return State.DONE;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
index 483899a..b7570b3 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
@@ -59,7 +59,7 @@ final class SparkContextFactory {
   private static JavaSparkContext createSparkContext(String master) {
     SparkConf conf = new SparkConf();
     conf.setMaster(master);
-    conf.setAppName("spark pipeline job");
+    conf.setAppName("spark dataflow pipeline job");
     conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
     return new JavaSparkContext(conf);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index fd4e048..12fb4e0 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -40,12 +40,11 @@ import org.slf4j.LoggerFactory;
 
 abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext {
 
-  private static final Logger LOG = LoggerFactory.getLogger(DoFnFunction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
 
   private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS =
       Collections.singletonList(GlobalWindow.INSTANCE);
 
-  private final DoFn<I, O> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
@@ -55,7 +54,6 @@ abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext
{
       SparkRuntimeContext runtime,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     fn.super();
-    this.mFunction = fn;
     this.mRuntimeContext = runtime;
     this.mSideInputs = sideInputs;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 6b78d9e..2689424 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -539,7 +539,6 @@ public final class TransformTranslator {
     return new TransformEvaluator<View.AsSingleton<T>>() {
       @Override
       public void evaluate(View.AsSingleton<T> transform, EvaluationContext context)
{
-        // TODO: PROBABLY INCORRECT. Fix it.
         Iterable<T> input = context.get(context.getInput(transform));
         context.setPView(context.getOutput(transform), Iterables.transform(input,
             new WindowingFunction<T>()));
@@ -552,7 +551,6 @@ public final class TransformTranslator {
       @Override
       public void evaluate(View.AsIterable<T> transform, EvaluationContext context)
{
         Iterable<T> input = context.get(context.getInput(transform));
-
         context.setPView(context.getOutput(transform), Iterables.transform(input,
             new WindowingFunction<T>()));
       }


Mime
View raw message