Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1F8C318035 for ; Tue, 15 Mar 2016 18:48:11 +0000 (UTC) Received: (qmail 91158 invoked by uid 500); 15 Mar 2016 18:48:11 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 91111 invoked by uid 500); 15 Mar 2016 18:48:11 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 91102 invoked by uid 99); 15 Mar 2016 18:48:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Mar 2016 18:48:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 6C2EAC0D23 for ; Tue, 15 Mar 2016 18:48:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ofI5DW9VDSOI for ; Tue, 15 Mar 2016 18:48:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 76C625FB6E for ; Tue, 15 Mar 2016 18:47:59 +0000 (UTC) Received: (qmail 89969 invoked by uid 99); 15 Mar 2016 18:47:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Mar 2016 18:47:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 870C7DFB8A; Tue, 15 Mar 2016 18:47:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.incubator.apache.org Date: Tue, 15 Mar 2016 18:48:16 -0000 Message-Id: In-Reply-To: <6c0a1e152deb40f28eeb18f1c1dc8a27@git.apache.org> References: <6c0a1e152deb40f28eeb18f1c1dc8a27@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/23] incubator-beam git commit: [BEAM-11] second iteration of package reorganisation [BEAM-11] second iteration of package reorganisation Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb0341d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb0341d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb0341d4 Branch: refs/heads/master Commit: eb0341d4df6a14d8d815ebfc51ffc0254923a8fa Parents: 3980f44 Author: Sela Authored: Mon Mar 14 23:48:05 2016 +0200 Committer: Sela Committed: Tue Mar 15 20:40:18 2016 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/DoFnFunction.java | 97 --- .../beam/runners/spark/EvaluationContext.java | 287 ------- .../beam/runners/spark/MultiDoFnFunction.java | 119 --- .../beam/runners/spark/SparkContextFactory.java | 69 -- .../runners/spark/SparkPipelineEvaluator.java | 55 -- .../spark/SparkPipelineOptionsFactory.java | 30 - .../spark/SparkPipelineOptionsRegistrar.java | 30 - .../beam/runners/spark/SparkPipelineRunner.java | 8 +- .../spark/SparkPipelineRunnerRegistrar.java | 30 - .../runners/spark/SparkPipelineTranslator.java | 30 - .../beam/runners/spark/SparkProcessContext.java | 262 ------ .../beam/runners/spark/SparkRuntimeContext.java | 217 ----- .../spark/SparkStreamingPipelineOptions.java | 41 + .../beam/runners/spark/TransformEvaluator.java | 27 - .../beam/runners/spark/TransformTranslator.java | 808 ------------------- .../beam/runners/spark/WindowingHelpers.java | 62 -- .../spark/aggregators/NamedAggregators.java | 2 +- .../SparkStreamingPipelineOptions.java | 43 - .../SparkStreamingPipelineOptionsFactory.java | 30 - .../SparkStreamingPipelineOptionsRegistrar.java | 31 - .../streaming/StreamingEvaluationContext.java | 229 ------ .../streaming/StreamingTransformTranslator.java | 418 ---------- .../StreamingWindowPipelineDetector.java | 104 --- .../runners/spark/translation/DoFnFunction.java | 97 +++ .../spark/translation/EvaluationContext.java | 288 +++++++ .../spark/translation/MultiDoFnFunction.java | 119 +++ .../spark/translation/SparkContextFactory.java | 69 ++ .../translation/SparkPipelineEvaluator.java | 56 ++ .../SparkPipelineOptionsFactory.java | 31 + .../SparkPipelineOptionsRegistrar.java | 31 + .../SparkPipelineRunnerRegistrar.java | 31 + .../translation/SparkPipelineTranslator.java | 30 + .../spark/translation/SparkProcessContext.java | 262 ++++++ .../spark/translation/SparkRuntimeContext.java | 217 +++++ .../spark/translation/TransformEvaluator.java | 27 + .../spark/translation/TransformTranslator.java | 808 +++++++++++++++++++ .../spark/translation/WindowingHelpers.java | 62 ++ .../SparkStreamingPipelineOptionsFactory.java | 31 + .../SparkStreamingPipelineOptionsRegistrar.java | 32 + .../streaming/StreamingEvaluationContext.java | 229 ++++++ .../streaming/StreamingTransformTranslator.java | 418 ++++++++++ .../StreamingWindowPipelineDetector.java | 104 +++ ...ataflow.sdk.options.PipelineOptionsRegistrar | 4 +- ...dataflow.sdk.runners.PipelineRunnerRegistrar | 2 +- .../beam/runners/spark/CombineGloballyTest.java | 91 --- .../beam/runners/spark/CombinePerKeyTest.java | 68 -- .../apache/beam/runners/spark/DeDupTest.java | 1 + .../beam/runners/spark/DoFnOutputTest.java | 61 -- .../beam/runners/spark/EmptyInputTest.java | 1 + .../runners/spark/MultiOutputWordCountTest.java | 135 ---- .../beam/runners/spark/SerializationTest.java | 180 ----- .../beam/runners/spark/SideEffectsTest.java | 79 -- .../beam/runners/spark/SimpleWordCountTest.java | 1 + .../spark/TestSparkPipelineOptionsFactory.java | 37 - .../runners/spark/TransformTranslatorTest.java | 98 --- .../runners/spark/WindowedWordCountTest.java | 67 -- .../beam/runners/spark/io/NumShardsTest.java | 2 +- .../spark/streaming/FlattenStreamingTest.java | 87 -- .../spark/streaming/KafkaStreamingTest.java | 139 ---- .../streaming/SimpleStreamingWordCountTest.java | 76 -- .../utils/DataflowAssertStreaming.java | 42 - .../streaming/utils/EmbeddedKafkaCluster.java | 317 -------- .../spark/translation/CombineGloballyTest.java | 94 +++ .../spark/translation/CombinePerKeyTest.java | 70 ++ .../spark/translation/DoFnOutputTest.java | 64 ++ .../translation/MultiOutputWordCountTest.java | 137 ++++ .../spark/translation/SerializationTest.java | 183 +++++ .../spark/translation/SideEffectsTest.java | 81 ++ .../TestSparkPipelineOptionsFactory.java | 38 + .../translation/TransformTranslatorTest.java | 99 +++ .../translation/WindowedWordCountTest.java | 71 ++ .../streaming/FlattenStreamingTest.java | 88 ++ .../streaming/KafkaStreamingTest.java | 140 ++++ .../streaming/SimpleStreamingWordCountTest.java | 77 ++ .../utils/DataflowAssertStreaming.java | 42 + .../streaming/utils/EmbeddedKafkaCluster.java | 317 ++++++++ 76 files changed, 4496 insertions(+), 4464 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java deleted file mode 100644 index e5d4542..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/DoFnFunction.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.apache.spark.api.java.function.FlatMapFunction; - -/** - * Dataflow's Do functions correspond to Spark's FlatMap functions. - * - * @param Input element type. - * @param Output element type. - */ -public class DoFnFunction implements FlatMapFunction>, - WindowedValue> { - private final DoFn mFunction; - private final SparkRuntimeContext mRuntimeContext; - private final Map, BroadcastHelper> mSideInputs; - - /** - * @param fn DoFunction to be wrapped. - * @param runtime Runtime to apply function in. - * @param sideInputs Side inputs used in DoFunction. - */ - public DoFnFunction(DoFn fn, - SparkRuntimeContext runtime, - Map, BroadcastHelper> sideInputs) { - this.mFunction = fn; - this.mRuntimeContext = runtime; - this.mSideInputs = sideInputs; - } - - @Override - public Iterable> call(Iterator> iter) throws - Exception { - ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs); - ctxt.setup(); - mFunction.startBundle(ctxt); - return ctxt.getOutputIterable(iter, mFunction); - } - - private class ProcCtxt extends SparkProcessContext> { - - private final List> outputs = new LinkedList<>(); - - ProcCtxt(DoFn fn, SparkRuntimeContext runtimeContext, Map, - BroadcastHelper> sideInputs) { - super(fn, runtimeContext, sideInputs); - } - - @Override - public synchronized void output(O o) { - outputs.add(windowedValue != null ? windowedValue.withValue(o) : - WindowedValue.valueInEmptyWindows(o)); - } - - @Override - public synchronized void output(WindowedValue o) { - outputs.add(o); - } - - @Override - protected void clearOutput() { - outputs.clear(); - } - - @Override - protected Iterator> getOutputIterator() { - return outputs.iterator(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java deleted file mode 100644 index ad49528..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationContext.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.PValue; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaSparkContext; - - -/** - * Evaluation context allows us to define how pipeline instructions. - */ -public class EvaluationContext implements EvaluationResult { - private final JavaSparkContext jsc; - private final Pipeline pipeline; - private final SparkRuntimeContext runtime; - private final Map> pcollections = new LinkedHashMap<>(); - private final Set> leafRdds = new LinkedHashSet<>(); - private final Set multireads = new LinkedHashSet<>(); - private final Map pobjects = new LinkedHashMap<>(); - private final Map>> pview = new LinkedHashMap<>(); - protected AppliedPTransform currentTransform; - - public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { - this.jsc = jsc; - this.pipeline = pipeline; - this.runtime = new SparkRuntimeContext(jsc, pipeline); - } - - /** - * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are - * sometimes created from a collection of objects (using RDD parallelize) and then - * only used to create View objects; in which case they do not need to be - * converted to bytes since they are not transferred across the network until they are - * broadcast. - */ - private class RDDHolder { - - private Iterable values; - private Coder coder; - private JavaRDDLike, ?> rdd; - - RDDHolder(Iterable values, Coder coder) { - this.values = values; - this.coder = coder; - } - - RDDHolder(JavaRDDLike, ?> rdd) { - this.rdd = rdd; - } - - JavaRDDLike, ?> getRDD() { - if (rdd == null) { - Iterable> windowedValues = Iterables.transform(values, - new Function>() { - @Override - public WindowedValue apply(T t) { - // TODO: this is wrong if T is a TimestampedValue - return WindowedValue.valueInEmptyWindows(t); - } - }); - WindowedValue.ValueOnlyWindowedValueCoder windowCoder = - WindowedValue.getValueOnlyCoder(coder); - rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) - .map(CoderHelpers.fromByteFunction(windowCoder)); - } - return rdd; - } - - Iterable getValues(PCollection pcollection) { - if (values == null) { - coder = pcollection.getCoder(); - JavaRDDLike bytesRDD = rdd.map(WindowingHelpers.unwindowFunction()) - .map(CoderHelpers.toByteFunction(coder)); - List clientBytes = bytesRDD.collect(); - values = Iterables.transform(clientBytes, new Function() { - @Override - public T apply(byte[] bytes) { - return CoderHelpers.fromByteArray(bytes, coder); - } - }); - } - return values; - } - - Iterable> getWindowedValues(PCollection pcollection) { - return Iterables.transform(get(pcollection), new Function>() { - @Override - public WindowedValue apply(T t) { - return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place? - } - }); - } - } - - protected JavaSparkContext getSparkContext() { - return jsc; - } - - protected Pipeline getPipeline() { - return pipeline; - } - - protected SparkRuntimeContext getRuntimeContext() { - return runtime; - } - - protected void setCurrentTransform(AppliedPTransform transform) { - this.currentTransform = transform; - } - - protected AppliedPTransform getCurrentTransform() { - return currentTransform; - } - - protected I getInput(PTransform transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - @SuppressWarnings("unchecked") - I input = (I) currentTransform.getInput(); - return input; - } - - protected O getOutput(PTransform transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - @SuppressWarnings("unchecked") - O output = (O) currentTransform.getOutput(); - return output; - } - - protected void setOutputRDD(PTransform transform, - JavaRDDLike, ?> rdd) { - setRDD((PValue) getOutput(transform), rdd); - } - - protected void setOutputRDDFromValues(PTransform transform, Iterable values, - Coder coder) { - pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder)); - } - - void setPView(PValue view, Iterable> value) { - pview.put(view, value); - } - - protected boolean hasOutputRDD(PTransform transform) { - PValue pvalue = (PValue) getOutput(transform); - return pcollections.containsKey(pvalue); - } - - protected JavaRDDLike getRDD(PValue pvalue) { - RDDHolder rddHolder = pcollections.get(pvalue); - JavaRDDLike rdd = rddHolder.getRDD(); - leafRdds.remove(rddHolder); - if (multireads.contains(pvalue)) { - // Ensure the RDD is marked as cached - rdd.rdd().cache(); - } else { - multireads.add(pvalue); - } - return rdd; - } - - protected void setRDD(PValue pvalue, JavaRDDLike, ?> rdd) { - try { - rdd.rdd().setName(pvalue.getName()); - } catch (IllegalStateException e) { - // name not set, ignore - } - RDDHolder rddHolder = new RDDHolder<>(rdd); - pcollections.put(pvalue, rddHolder); - leafRdds.add(rddHolder); - } - - JavaRDDLike getInputRDD(PTransform transform) { - return getRDD((PValue) getInput(transform)); - } - - - Iterable> getPCollectionView(PCollectionView view) { - return pview.get(view); - } - - /** - * Computes the outputs for all RDDs that are leaves in the DAG and do not have any - * actions (like saving to a file) registered on them (i.e. they are performed for side - * effects). - */ - protected void computeOutputs() { - for (RDDHolder rddHolder : leafRdds) { - JavaRDDLike rdd = rddHolder.getRDD(); - rdd.rdd().cache(); // cache so that any subsequent get() is cheap - rdd.count(); // force the RDD to be computed - } - } - - @Override - public T get(PValue value) { - if (pobjects.containsKey(value)) { - @SuppressWarnings("unchecked") - T result = (T) pobjects.get(value); - return result; - } - if (pcollections.containsKey(value)) { - JavaRDDLike rdd = pcollections.get(value).getRDD(); - @SuppressWarnings("unchecked") - T res = (T) Iterables.getOnlyElement(rdd.collect()); - pobjects.put(value, res); - return res; - } - throw new IllegalStateException("Cannot resolve un-known PObject: " + value); - } - - @Override - public T getAggregatorValue(String named, Class resultType) { - return runtime.getAggregatorValue(named, resultType); - } - - @Override - public AggregatorValues getAggregatorValues(Aggregator aggregator) - throws AggregatorRetrievalException { - return runtime.getAggregatorValues(aggregator); - } - - @Override - public Iterable get(PCollection pcollection) { - @SuppressWarnings("unchecked") - RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); - return rddHolder.getValues(pcollection); - } - - Iterable> getWindowedValues(PCollection pcollection) { - @SuppressWarnings("unchecked") - RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); - return rddHolder.getWindowedValues(pcollection); - } - - @Override - public void close() { - SparkContextFactory.stopSparkContext(jsc); - } - - /** The runner is blocking. */ - @Override - public State getState() { - return State.DONE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java deleted file mode 100644 index 47433a6..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/MultiDoFnFunction.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import java.util.Iterator; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Multimap; -import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.joda.time.Instant; -import scala.Tuple2; - -/** - * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the - * underlying data with multiple TupleTags. - * - * @param Input type for DoFunction. - * @param Output type for DoFunction. - */ -class MultiDoFnFunction - implements PairFlatMapFunction>, TupleTag, WindowedValue> { - private final DoFn mFunction; - private final SparkRuntimeContext mRuntimeContext; - private final TupleTag mMainOutputTag; - private final Map, BroadcastHelper> mSideInputs; - - MultiDoFnFunction( - DoFn fn, - SparkRuntimeContext runtimeContext, - TupleTag mainOutputTag, - Map, BroadcastHelper> sideInputs) { - this.mFunction = fn; - this.mRuntimeContext = runtimeContext; - this.mMainOutputTag = mainOutputTag; - this.mSideInputs = sideInputs; - } - - @Override - public Iterable, WindowedValue>> - call(Iterator> iter) throws Exception { - ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs); - mFunction.startBundle(ctxt); - ctxt.setup(); - return ctxt.getOutputIterable(iter, mFunction); - } - - private class ProcCtxt extends SparkProcessContext, WindowedValue>> { - - private final Multimap, WindowedValue> outputs = LinkedListMultimap.create(); - - ProcCtxt(DoFn fn, SparkRuntimeContext runtimeContext, Map, - BroadcastHelper> sideInputs) { - super(fn, runtimeContext, sideInputs); - } - - @Override - public synchronized void output(O o) { - outputs.put(mMainOutputTag, windowedValue.withValue(o)); - } - - @Override - public synchronized void output(WindowedValue o) { - outputs.put(mMainOutputTag, o); - } - - @Override - public synchronized void sideOutput(TupleTag tag, T t) { - outputs.put(tag, windowedValue.withValue(t)); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tupleTag, T t, Instant instant) { - outputs.put(tupleTag, WindowedValue.of(t, instant, - windowedValue.getWindows(), windowedValue.getPane())); - } - - @Override - protected void clearOutput() { - outputs.clear(); - } - - @Override - protected Iterator, WindowedValue>> getOutputIterator() { - return Iterators.transform(outputs.entries().iterator(), - new Function, WindowedValue>, - Tuple2, WindowedValue>>() { - @Override - public Tuple2, WindowedValue> apply(Map.Entry, - WindowedValue> input) { - return new Tuple2, WindowedValue>(input.getKey(), input.getValue()); - } - }); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java deleted file mode 100644 index 4393a75..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.serializer.KryoSerializer; - -final class SparkContextFactory { - - /** - * If the property {@code dataflow.spark.test.reuseSparkContext} is set to - * {@code true} then the Spark context will be reused for dataflow pipelines. - * This property should only be enabled for tests. - */ - static final String TEST_REUSE_SPARK_CONTEXT = - "dataflow.spark.test.reuseSparkContext"; - private static JavaSparkContext sparkContext; - private static String sparkMaster; - - private SparkContextFactory() { - } - - static synchronized JavaSparkContext getSparkContext(String master, String appName) { - if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { - if (sparkContext == null) { - sparkContext = createSparkContext(master, appName); - sparkMaster = master; - } else if (!master.equals(sparkMaster)) { - throw new IllegalArgumentException(String.format("Cannot reuse spark context " + - "with different spark master URL. Existing: %s, requested: %s.", - sparkMaster, master)); - } - return sparkContext; - } else { - return createSparkContext(master, appName); - } - } - - static synchronized void stopSparkContext(JavaSparkContext context) { - if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { - context.stop(); - } - } - - private static JavaSparkContext createSparkContext(String master, String appName) { - SparkConf conf = new SparkConf(); - conf.setMaster(master); - conf.setAppName(appName); - conf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); - return new JavaSparkContext(conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java deleted file mode 100644 index becf15a..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineEvaluator.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; - -/** - * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark. - */ -public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator { - - private final EvaluationContext ctxt; - - public SparkPipelineEvaluator(EvaluationContext ctxt, SparkPipelineTranslator translator) { - super(translator); - this.ctxt = ctxt; - } - - @Override - protected > void doVisitTransform(TransformTreeNode - node) { - @SuppressWarnings("unchecked") - PT transform = (PT) node.getTransform(); - @SuppressWarnings("unchecked") - Class transformClass = (Class) (Class) transform.getClass(); - @SuppressWarnings("unchecked") TransformEvaluator evaluator = - (TransformEvaluator) translator.translate(transformClass); - LOG.info("Evaluating {}", transform); - AppliedPTransform appliedTransform = - AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform); - ctxt.setCurrentTransform(appliedTransform); - evaluator.evaluate(transform, ctxt); - ctxt.setCurrentTransform(null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java deleted file mode 100644 index 9bff013..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; - -public final class SparkPipelineOptionsFactory { - private SparkPipelineOptionsFactory() { - } - - public static SparkPipelineOptions create() { - return PipelineOptionsFactory.as(SparkPipelineOptions.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java deleted file mode 100644 index c68af64..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptionsRegistrar.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; -import com.google.common.collect.ImmutableList; - -public class SparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(SparkPipelineOptions.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index b1a402f..d5e4186 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -28,10 +28,10 @@ import com.google.cloud.dataflow.sdk.values.PInput; import com.google.cloud.dataflow.sdk.values.POutput; import com.google.cloud.dataflow.sdk.values.PValue; -import org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptions; -import org.apache.beam.runners.spark.streaming.StreamingEvaluationContext; -import org.apache.beam.runners.spark.streaming.StreamingTransformTranslator; -import org.apache.beam.runners.spark.streaming.StreamingWindowPipelineDetector; +import org.apache.beam.runners.spark.translation.*; +import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext; +import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java deleted file mode 100644 index 7861685..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunnerRegistrar.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar; -import com.google.common.collect.ImmutableList; - -public class SparkPipelineRunnerRegistrar implements PipelineRunnerRegistrar { - @Override - public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(SparkPipelineRunner.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java deleted file mode 100644 index 2e38a07..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineTranslator.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; - -/** - * Translator to support translation between Dataflow transformations and Spark transformations. - */ -public interface SparkPipelineTranslator { - - boolean hasTranslation(Class> clazz); - - > TransformEvaluator translate(Class clazz); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java deleted file mode 100644 index b3a720d..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkProcessContext.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.util.state.*; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterables; - -import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class SparkProcessContext extends DoFn.ProcessContext { - - private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class); - - private final DoFn fn; - private final SparkRuntimeContext mRuntimeContext; - private final Map, BroadcastHelper> mSideInputs; - - protected WindowedValue windowedValue; - - SparkProcessContext(DoFn fn, - SparkRuntimeContext runtime, - Map, BroadcastHelper> sideInputs) { - fn.super(); - this.fn = fn; - this.mRuntimeContext = runtime; - this.mSideInputs = sideInputs; - } - - void setup() { - setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return mRuntimeContext.getPipelineOptions(); - } - - @Override - public T sideInput(PCollectionView view) { - @SuppressWarnings("unchecked") - BroadcastHelper>> broadcastHelper = - (BroadcastHelper>>) mSideInputs.get(view.getTagInternal()); - Iterable> contents = broadcastHelper.getValue(); - return view.fromIterableInternal(contents); - } - - @Override - public abstract void output(O output); - - public abstract void output(WindowedValue output); - - @Override - public void sideOutput(TupleTag tupleTag, T t) { - String message = "sideOutput is an unsupported operation for doFunctions, use a " + - "MultiDoFunction instead."; - LOG.warn(message); - throw new UnsupportedOperationException(message); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tupleTag, T t, Instant instant) { - String message = - "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a " + - "MultiDoFunction instead."; - LOG.warn(message); - throw new UnsupportedOperationException(message); - } - - @Override - public Aggregator createAggregatorInternal( - String named, - Combine.CombineFn combineFn) { - return mRuntimeContext.createAggregator(named, combineFn); - } - - @Override - public I element() { - return windowedValue.getValue(); - } - - @Override - public void outputWithTimestamp(O output, Instant timestamp) { - output(WindowedValue.of(output, timestamp, - windowedValue.getWindows(), windowedValue.getPane())); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindow."); - } - return Iterables.getOnlyElement(windowedValue.getWindows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public WindowingInternals windowingInternals() { - return new WindowingInternals() { - - @Override - public Collection windows() { - return windowedValue.getWindows(); - } - - @Override - public void outputWindowedValue(O output, Instant timestamp, Collection windows, PaneInfo paneInfo) { - output(WindowedValue.of(output, timestamp, windows, paneInfo)); - } - - @Override - public StateInternals stateInternals() { - //TODO: implement state internals. - // This is a temporary placeholder to get the TfIdfTest - // working for the initial Beam code drop. - return InMemoryStateInternals.forKey("DUMMY"); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException( - "WindowingInternals#timerInternals() is not yet supported."); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public void writePCollectionViewData(TupleTag tag, - Iterable> data, Coder elemCoder) throws IOException { - throw new UnsupportedOperationException( - "WindowingInternals#writePCollectionViewData() is not yet supported."); - } - - @Override - public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { - throw new UnsupportedOperationException( - "WindowingInternals#sideInput() is not yet supported."); - } - }; - } - - protected abstract void clearOutput(); - protected abstract Iterator getOutputIterator(); - - protected Iterable getOutputIterable(final Iterator> iter, - final DoFn doFn) { - return new Iterable() { - @Override - public Iterator iterator() { - return new ProcCtxtIterator(iter, doFn); - } - }; - } - - private class ProcCtxtIterator extends AbstractIterator { - - private final Iterator> inputIterator; - private final DoFn doFn; - private Iterator outputIterator; - private boolean calledFinish; - - ProcCtxtIterator(Iterator> iterator, DoFn doFn) { - this.inputIterator = iterator; - this.doFn = doFn; - this.outputIterator = getOutputIterator(); - } - - @Override - protected V computeNext() { - // Process each element from the (input) iterator, which produces, zero, one or more - // output elements (of type V) in the output iterator. Note that the output - // collection (and iterator) is reset between each call to processElement, so the - // collection only holds the output values for each call to processElement, rather - // than for the whole partition (which would use too much memory). - while (true) { - if (outputIterator.hasNext()) { - return outputIterator.next(); - } else if (inputIterator.hasNext()) { - clearOutput(); - windowedValue = inputIterator.next(); - try { - doFn.processElement(SparkProcessContext.this); - } catch (Exception e) { - throw new SparkProcessException(e); - } - outputIterator = getOutputIterator(); - } else { - // no more input to consume, but finishBundle can produce more output - if (!calledFinish) { - clearOutput(); - try { - calledFinish = true; - doFn.finishBundle(SparkProcessContext.this); - } catch (Exception e) { - throw new SparkProcessException(e); - } - outputIterator = getOutputIterator(); - continue; // try to consume outputIterator from start of loop - } - return endOfData(); - } - } - } - } - - static class SparkProcessException extends RuntimeException { - SparkProcessException(Throwable t) { - super(t); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java deleted file mode 100644 index f0f9974..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRuntimeContext.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderRegistry; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Max; -import com.google.cloud.dataflow.sdk.transforms.Min; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableList; - -import org.apache.beam.runners.spark.aggregators.AggAccumParam; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; - -import org.apache.spark.Accumulator; -import org.apache.spark.api.java.JavaSparkContext; - - -/** - * The SparkRuntimeContext allows us to define useful features on the client side before our - * data flow program is launched. - */ -public class SparkRuntimeContext implements Serializable { - /** - * An accumulator that is a map from names to aggregators. - */ - private final Accumulator accum; - - private final String serializedPipelineOptions; - - /** - * Map fo names to dataflow aggregators. - */ - private final Map> aggregators = new HashMap<>(); - private transient CoderRegistry coderRegistry; - - SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) { - this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam()); - this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); - } - - private static String serializePipelineOptions(PipelineOptions pipelineOptions) { - try { - return new ObjectMapper().writeValueAsString(pipelineOptions); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Failed to serialize the pipeline options.", e); - } - } - - private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions) { - try { - return new ObjectMapper().readValue(serializedPipelineOptions, PipelineOptions.class); - } catch (IOException e) { - throw new IllegalStateException("Failed to deserialize the pipeline options.", e); - } - } - - /** - * Retrieves corresponding value of an aggregator. - * - * @param aggregatorName Name of the aggregator to retrieve the value of. - * @param typeClass Type class of value to be retrieved. - * @param Type of object to be returned. - * @return The value of the aggregator. - */ - public T getAggregatorValue(String aggregatorName, Class typeClass) { - return accum.value().getValue(aggregatorName, typeClass); - } - - public AggregatorValues getAggregatorValues(Aggregator aggregator) { - @SuppressWarnings("unchecked") - Class aggValueClass = (Class) aggregator.getCombineFn().getOutputType().getRawType(); - final T aggregatorValue = getAggregatorValue(aggregator.getName(), aggValueClass); - return new AggregatorValues() { - @Override - public Collection getValues() { - return ImmutableList.of(aggregatorValue); - } - - @Override - public Map getValuesAtSteps() { - throw new UnsupportedOperationException("getValuesAtSteps is not supported."); - } - }; - } - - public synchronized PipelineOptions getPipelineOptions() { - return deserializePipelineOptions(serializedPipelineOptions); - } - - /** - * Creates and aggregator and associates it with the specified name. - * - * @param named Name of aggregator. - * @param combineFn Combine function used in aggregation. - * @param Type of inputs to aggregator. - * @param Intermediate data type - * @param Type of aggregator outputs. - * @return Specified aggregator - */ - public synchronized Aggregator createAggregator( - String named, - Combine.CombineFn combineFn) { - @SuppressWarnings("unchecked") - Aggregator aggregator = (Aggregator) aggregators.get(named); - if (aggregator == null) { - @SuppressWarnings("unchecked") - NamedAggregators.CombineFunctionState state = - new NamedAggregators.CombineFunctionState<>( - (Combine.CombineFn) combineFn, - (Coder) getCoder(combineFn), - this); - accum.add(new NamedAggregators(named, state)); - aggregator = new SparkAggregator<>(named, state); - aggregators.put(named, aggregator); - } - return aggregator; - } - - public CoderRegistry getCoderRegistry() { - if (coderRegistry == null) { - coderRegistry = new CoderRegistry(); - coderRegistry.registerStandardCoders(); - } - return coderRegistry; - } - - private Coder getCoder(Combine.CombineFn combiner) { - try { - if (combiner.getClass() == Sum.SumIntegerFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); - } else if (combiner.getClass() == Sum.SumLongFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); - } else if (combiner.getClass() == Sum.SumDoubleFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); - } else if (combiner.getClass() == Min.MinIntegerFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); - } else if (combiner.getClass() == Min.MinLongFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); - } else if (combiner.getClass() == Min.MinDoubleFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); - } else if (combiner.getClass() == Max.MaxIntegerFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); - } else if (combiner.getClass() == Max.MaxLongFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); - } else if (combiner.getClass() == Max.MaxDoubleFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); - } else { - throw new IllegalArgumentException("unsupported combiner in Aggregator: " - + combiner.getClass().getName()); - } - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine default coder for combiner", e); - } - } - - /** - * Initialize spark aggregators exactly once. - * - * @param Type of element fed in to aggregator. - */ - private static class SparkAggregator implements Aggregator, Serializable { - private final String name; - private final NamedAggregators.State state; - - SparkAggregator(String name, NamedAggregators.State state) { - this.name = name; - this.state = state; - } - - @Override - public String getName() { - return name; - } - - @Override - public void addValue(IN elem) { - state.update(elem); - } - - @Override - public Combine.CombineFn getCombineFn() { - return state.getCombineFn(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java new file mode 100644 index 0000000..be40313 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkStreamingPipelineOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; + +/** + * Options used to configure Spark streaming. + */ +public interface SparkStreamingPipelineOptions extends SparkPipelineOptions { + @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " + + "execution is stopped") + @Default.Long(-1) + Long getTimeout(); + + void setTimeout(Long batchInterval); + + @Override + @Default.Boolean(true) + boolean isStreaming(); + + @Override + @Default.String("spark streaming dataflow pipeline job") + String getAppName(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java deleted file mode 100644 index 4b4f81f..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformEvaluator.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import java.io.Serializable; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; - -public interface TransformEvaluator> extends Serializable { - void evaluate(PT transform, EvaluationContext context); -}