Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2271E2004A1 for ; Thu, 24 Aug 2017 08:43:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 20C59168F6B; Thu, 24 Aug 2017 06:43:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2181C168F67 for ; Thu, 24 Aug 2017 08:43:44 +0200 (CEST) Received: (qmail 24790 invoked by uid 500); 24 Aug 2017 06:43:44 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 24778 invoked by uid 99); 24 Aug 2017 06:43:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Aug 2017 06:43:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AC130F5EEC; Thu, 24 Aug 2017 06:43:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: staslevin@apache.org To: commits@beam.apache.org Date: Thu, 24 Aug 2017 06:43:42 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner. archived-at: Thu, 24 Aug 2017 06:43:47 -0000 Repository: beam Updated Branches: refs/heads/master c4517d04c -> 5181e619f [BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15472b28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15472b28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15472b28 Branch: refs/heads/master Commit: 15472b28c649381b90a0405d80012aa8523d13c5 Parents: c4517d0 Author: Stas Levin Authored: Sun Aug 20 16:48:57 2017 +0300 Committer: Stas Levin Committed: Thu Aug 24 09:42:12 2017 +0300 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 5 +- .../beam/runners/spark/io/CreateStream.java | 104 ++++--- .../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++++++--- .../spark/stateful/SparkTimerInternals.java | 6 + .../streaming/StreamingTransformTranslator.java | 71 +++-- .../streaming/WatermarkSyncedDStream.java | 149 +++++++++ .../spark/util/GlobalWatermarkHolder.java | 302 +++++++++++++------ .../runners/spark/SparkPipelineStateTest.java | 4 +- .../translation/streaming/CreateStreamTest.java | 33 +- .../spark/src/test/resources/log4j.properties | 11 +- 10 files changed, 633 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 595521f..98ca1be 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.io.Read; @@ -171,7 +171,8 @@ public final class SparkRunner extends PipelineRunner { } // register Watermarks listener to broadcast the advanced WMs. - jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener())); + jssc.addStreamingListener( + new JavaStreamingListenerWrapper(new WatermarkAdvancingStreamingListener())); // The reason we call initAccumulators here even though it is called in // SparkRunnerStreamingContextFactory is because the factory is not called when resuming http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index d485d25..4c73d95 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -41,34 +41,34 @@ import org.joda.time.Instant; /** * Create an input stream from Queue. For SparkRunner tests only. * - *

To properly compose a stream of micro-batches with their Watermarks, please keep in mind - * that eventually there a two queues here - one for batches and another for Watermarks. + *

To properly compose a stream of micro-batches with their Watermarks, please keep in mind that + * eventually there a two queues here - one for batches and another for Watermarks. * - *

While both queues advance according to Spark's batch-interval, there is a slight difference - * in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks + *

While both queues advance according to Spark's batch-interval, there is a slight difference in + * how data is pushed into the stream compared to the advancement of Watermarks since Watermarks * advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific - * batch it should be called before that batch. - * Also keep in mind that being a queue that is polled per batch interval, if there is a need to - * "hold" the same Watermark without advancing it it should be stated explicitly or the Watermark - * will advance as soon as it can (in the next batch completed hook). + * batch it should be called before that batch. Also keep in mind that being a queue that is polled + * per batch interval, if there is a need to "hold" the same Watermark without advancing it it + * should be stated explicitly or the Watermark will advance as soon as it can (in the next batch + * completed hook). * *

Example 1: * - * {@code - * CreateStream.>withBatchInterval(batchDuration) - * .nextBatch( - * TimestampedValue.of("foo", endOfGlobalWindow), - * TimestampedValue.of("bar", endOfGlobalWindow)) - * .advanceNextBatchWatermarkToInfinity(); - * } - * The first batch will see the default start-of-time WM of - * {@link BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see - * the end-of-time WM {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + *

{@code
+ * CreateStream.of(StringUtf8Coder.of(), batchDuration)
+ *   .nextBatch(
+ *     TimestampedValue.of("foo", endOfGlobalWindow),
+ *     TimestampedValue.of("bar", endOfGlobalWindow))
+ *   .advanceNextBatchWatermarkToInfinity();
+ * }
+ * The first batch will see the default start-of-time WM of {@link + * BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see the end-of-time WM {@link + * BoundedWindow#TIMESTAMP_MAX_VALUE}. * *

Example 2: * - * {@code - * CreateStream.>withBatchInterval(batchDuration) + *

{@code
+ * CreateStream.of(VarIntCoder.of(), batchDuration)
  *     .nextBatch(
  *         TimestampedValue.of(1, instant))
  *     .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
@@ -77,32 +77,59 @@ import org.joda.time.Instant;
  *     .nextBatch(
  *         TimestampedValue.of(3, instant))
  *     .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
- * }
- * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM.
- * The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM
- * regardless of where it ws called in the construction of CreateStream.
- * //TODO: write a proper Builder enforcing all those rules mentioned.
- * @param  stream type.
+ * }
+ * + *

+ * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) + * WM. The third WM will see the WM advanced to +30 min, because this is the next advancement + * of the WM regardless of where it ws called in the construction of CreateStream. + *

+ * + * @param The type of the element in this stream. */ +//TODO: write a proper Builder enforcing all those rules mentioned. public final class CreateStream extends PTransform> { - private final Duration batchInterval; + private final Duration batchDuration; private final Queue>> batches = new LinkedList<>(); private final Deque times = new LinkedList<>(); private final Coder coder; private Instant initialSystemTime; + private final boolean forceWatermarkSync; private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes. - private CreateStream(Duration batchInterval, Instant initialSystemTime, Coder coder) { - this.batchInterval = batchInterval; + private CreateStream(Duration batchDuration, + Instant initialSystemTime, + Coder coder, + boolean forceWatermarkSync) { + this.batchDuration = batchDuration; this.initialSystemTime = initialSystemTime; this.coder = coder; + this.forceWatermarkSync = forceWatermarkSync; } - /** Set the batch interval for the stream. */ - public static CreateStream of(Coder coder, Duration batchInterval) { - return new CreateStream<>(batchInterval, new Instant(0), coder); + /** + * Creates a new Spark based stream intended for test purposes. + * + * @param batchDuration the batch duration (interval) to be used for creating this stream. + * @param coder the coder to be used for this stream. + * @param forceWatermarkSync whether this stream should be synced with the advancement of the + * watermark maintained by the + * {@link org.apache.beam.runners.spark.util.GlobalWatermarkHolder}. + */ + public static CreateStream of(Coder coder, + Duration batchDuration, + boolean forceWatermarkSync) { + return new CreateStream<>(batchDuration, new Instant(0), coder, forceWatermarkSync); + } + + /** + * Creates a new Spark based stream without forced watermark sync, intended for test purposes. + * See also {@link CreateStream#of(Coder, Duration, boolean)}. + */ + public static CreateStream of(Coder coder, Duration batchDuration) { + return of(coder, batchDuration, true); } /** @@ -112,8 +139,7 @@ public final class CreateStream extends PTransform> { @SafeVarargs public final CreateStream nextBatch(TimestampedValue... batchElements) { // validate timestamps if timestamped elements. - for (TimestampedValue element: batchElements) { - TimestampedValue timestampedValue = (TimestampedValue) element; + for (final TimestampedValue timestampedValue: batchElements) { checkArgument( timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", @@ -177,7 +203,7 @@ public final class CreateStream extends PTransform> { // advance the system time. Instant currentSynchronizedProcessingTime = times.peekLast() == null ? initialSystemTime : times.peekLast().getSynchronizedProcessingTime(); - Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchInterval); + Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchDuration); checkArgument( nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime), "Synchronized processing time must always advance."); @@ -186,6 +212,10 @@ public final class CreateStream extends PTransform> { return this; } + public long getBatchDuration() { + return batchDuration.getMillis(); + } + /** Get the underlying queue representing the mock stream of micro-batches. */ public Queue>> getBatches() { return batches; @@ -199,6 +229,10 @@ public final class CreateStream extends PTransform> { return times; } + public boolean isForceWatermarkSync() { + return forceWatermarkSync; + } + @Override public PCollection expand(PBegin input) { return PCollection.createPrimitiveOutputInternal( http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1263618..52f7376 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -17,12 +17,15 @@ */ package org.apache.beam.runners.spark.stateful; +import com.google.common.base.Joiner; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; import com.google.common.collect.Table; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.LateDataUtils; @@ -46,6 +49,7 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -61,6 +65,7 @@ import org.apache.spark.api.java.JavaSparkContext$; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.dstream.DStream; @@ -104,12 +109,13 @@ public class SparkGroupAlsoByWindowViaWindowSet { public static JavaDStream>>> groupAlsoByWindow( - final JavaDStream>>>> inputDStream, - final Coder keyCoder, - final Coder> wvCoder, - final WindowingStrategy windowingStrategy, - final SerializablePipelineOptions options, - final List sourceIds) { + final JavaDStream>>>> inputDStream, + final Coder keyCoder, + final Coder> wvCoder, + final WindowingStrategy windowingStrategy, + final SerializablePipelineOptions options, + final List sourceIds, + final String transformFullName) { final long batchDurationMillis = options.get().as(SparkPipelineOptions.class).getBatchIntervalMillis(); @@ -140,30 +146,44 @@ public class SparkGroupAlsoByWindowViaWindowSet { DStream>*/ byte[]>> pairDStream = inputDStream .transformToPair( - new Function< + new org.apache.spark.api.java.function.Function2< JavaRDD>>>>, - JavaPairRDD>() { + Time, JavaPairRDD>() { // we use mapPartitions with the RDD API because its the only available API // that allows to preserve partitioning. @Override public JavaPairRDD call( - JavaRDD>>>> rdd) + JavaRDD>>>> rdd, + final Time time) throws Exception { return rdd.mapPartitions( - TranslationUtils.functionToFlatMapFunction( - WindowingHelpers - .>>>unwindowFunction()), - true) - .mapPartitionsToPair( - TranslationUtils - .>>toPairFlatMapFunction(), - true) - // move to bytes representation and use coders for deserialization - // because of checkpointing. - .mapPartitionsToPair( - TranslationUtils.pairFunctionToPairFlatMapFunction( - CoderHelpers.toByteFunction(keyCoder, itrWvCoder)), - true); + TranslationUtils.functionToFlatMapFunction( + WindowingHelpers + .>>>unwindowFunction()), + true) + .mapPartitionsToPair( + TranslationUtils + .>>toPairFlatMapFunction(), + true) + .mapValues(new Function>, KV>>>() { + + @Override + public KV>> call + (Iterable> values) + throws Exception { + // add the batch timestamp for visibility (e.g., debugging) + return KV.of(time.milliseconds(), values); + } + }) + // move to bytes representation and use coders for deserialization + // because of checkpointing. + .mapPartitionsToPair( + TranslationUtils.pairFunctionToPairFlatMapFunction( + CoderHelpers.toByteFunction(keyCoder, + KvCoder.of(VarLongCoder.of(), + itrWvCoder))), + true); } }) .dstream(); @@ -219,9 +239,10 @@ public class SparkGroupAlsoByWindowViaWindowSet { GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER)); AbstractIterator< - Tuple2>>*/ List>>> + Tuple2>>>*/ + List>>> outIter = new AbstractIterator>>*/ List>>>() { + Tuple2>>>*/ List>>>() { @Override protected Tuple2>>*/ List>> computeNext() { @@ -240,8 +261,11 @@ public class SparkGroupAlsoByWindowViaWindowSet { List>> prevStateAndTimersOpt = next._3(); SparkStateInternals stateInternals; + Map watermarks = + GlobalWatermarkHolder.get(batchDurationMillis); SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources( - sourceIds, GlobalWatermarkHolder.get(batchDurationMillis)); + sourceIds, watermarks); + // get state(internals) per key. if (prevStateAndTimersOpt.isEmpty()) { // no previous state. @@ -271,20 +295,49 @@ public class SparkGroupAlsoByWindowViaWindowSet { options.get()); outputHolder.clear(); // clear before potential use. + if (!seq.isEmpty()) { // new input for key. try { - Iterable> elementsIterable = - CoderHelpers.fromByteArray(seq.head(), itrWvCoder); - Iterable> validElements = - LateDataUtils - .dropExpiredWindows( - key, - elementsIterable, - timerInternals, - windowingStrategy, - droppedDueToLateness); - reduceFnRunner.processElements(validElements); + final KV>> keyedElements = + CoderHelpers.fromByteArray(seq.head(), + KvCoder.of(VarLongCoder.of(), itrWvCoder)); + + final Long rddTimestamp = keyedElements.getKey(); + + LOG.debug( + transformFullName + + ": processing RDD with timestamp: {}, watermarks: {}", + rddTimestamp, + watermarks); + + final Iterable> elements = keyedElements.getValue(); + + LOG.trace(transformFullName + ": input elements: {}", elements); + + /* + Incoming expired windows are filtered based on + timerInternals.currentInputWatermarkTime() and the configured allowed + lateness. Note that this is done prior to calling + timerInternals.advanceWatermark so essentially the inputWatermark is + the highWatermark of the previous batch and the lowWatermark of the + current batch. + The highWatermark of the current batch will only affect filtering + as of the next batch. + */ + final Iterable> nonExpiredElements = + Lists.newArrayList(LateDataUtils + .dropExpiredWindows( + key, + elements, + timerInternals, + windowingStrategy, + droppedDueToLateness)); + + LOG.trace(transformFullName + ": non expired input elements: {}", + elements); + + reduceFnRunner.processElements(nonExpiredElements); } catch (Exception e) { throw new RuntimeException( "Failed to process element with ReduceFnRunner", e); @@ -295,9 +348,28 @@ public class SparkGroupAlsoByWindowViaWindowSet { } try { // advance the watermark to HWM to fire by timers. + LOG.debug(transformFullName + ": timerInternals before advance are {}", + timerInternals.toString()); + + // store the highWatermark as the new inputWatermark to calculate triggers timerInternals.advanceWatermark(); + + LOG.debug(transformFullName + ": timerInternals after advance are {}", + timerInternals.toString()); + // call on timers that are ready. - reduceFnRunner.onTimers(timerInternals.getTimersReadyToProcess()); + final Collection readyToProcess = + timerInternals.getTimersReadyToProcess(); + + LOG.debug(transformFullName + ": ready timers are {}", readyToProcess); + + /* + Note that at this point, the watermark has already advanced since + timerInternals.advanceWatermark() has been called and the highWatermark + is now stored as the new inputWatermark, according to which triggers are + calculated. + */ + reduceFnRunner.onTimers(readyToProcess); } catch (Exception e) { throw new RuntimeException( "Failed to process ReduceFnRunner onTimer.", e); @@ -306,10 +378,20 @@ public class SparkGroupAlsoByWindowViaWindowSet { reduceFnRunner.persist(); // obtain output, if fired. List>>> outputs = outputHolder.get(); + if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) { + // empty outputs are filtered later using DStream filtering StateAndTimers updated = new StateAndTimers(stateInternals.getState(), SparkTimerInternals.serializeTimers( timerInternals.getTimers(), timerDataCoder)); + + /* + Not something we want to happen in production, but is very helpful + when debugging - TRACE. + */ + LOG.trace(transformFullName + ": output elements are {}", + Joiner.on(", ").join(outputs)); + // persist Spark's state by outputting. List serOutput = CoderHelpers.toByteArrays(outputs, wvKvIterCoder); return new Tuple2<>(encodedKey, new Tuple2<>(updated, serOutput)); http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java index a68da55..c998328 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -188,4 +188,10 @@ public class SparkTimerInternals implements TimerInternals { return CoderHelpers.fromByteArrays(serTimers, timerDataCoder); } + @Override + public String toString() { + return "SparkTimerInternals{" + "highWatermark=" + highWatermark + + ", synchronizedProcessingTime=" + synchronizedProcessingTime + ", timers=" + timers + + ", inputWatermark=" + inputWatermark + '}'; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 38d6119..4114803 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -82,6 +82,7 @@ import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaSparkContext$; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; @@ -139,18 +140,41 @@ public final class StreamingTransformTranslator { return new TransformEvaluator>() { @Override public void evaluate(CreateStream transform, EvaluationContext context) { - Coder coder = context.getOutput(transform).getCoder(); - JavaStreamingContext jssc = context.getStreamingContext(); - Queue>> values = transform.getBatches(); - WindowedValue.FullWindowedValueCoder windowCoder = + + final Queue>> rddQueue = + buildRdds( + transform.getBatches(), + context.getStreamingContext(), + context.getOutput(transform).getCoder()); + + final JavaInputDStream> javaInputDStream = + buildInputStream(rddQueue, transform, context); + + final UnboundedDataset unboundedDataset = + new UnboundedDataset<>( + javaInputDStream, Collections.singletonList(javaInputDStream.inputDStream().id())); + + // add pre-baked Watermarks for the pre-baked batches. + GlobalWatermarkHolder.addAll( + ImmutableMap.of(unboundedDataset.getStreamSources().get(0), transform.getTimes())); + + context.putDataset(transform, unboundedDataset); + } + + private Queue>> buildRdds( + Queue>> batches, JavaStreamingContext jssc, Coder coder) { + + final WindowedValue.FullWindowedValueCoder windowCoder = WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); - // create the DStream from queue. - Queue>> rddQueue = new LinkedBlockingQueue<>(); - for (Iterable> tv : values) { - Iterable> windowedValues = + + final Queue>> rddQueue = new LinkedBlockingQueue<>(); + + for (final Iterable> timestampedValues : batches) { + final Iterable> windowedValues = Iterables.transform( - tv, + timestampedValues, new com.google.common.base.Function, WindowedValue>() { + @Override public WindowedValue apply(@Nonnull TimestampedValue timestampedValue) { return WindowedValue.of( @@ -159,22 +183,28 @@ public final class StreamingTransformTranslator { GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); } - }); - JavaRDD> rdd = + }); + + final JavaRDD> rdd = jssc.sparkContext() .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) .map(CoderHelpers.fromByteFunction(windowCoder)); + rddQueue.offer(rdd); } + return rddQueue; + } - JavaInputDStream> inputDStream = jssc.queueStream(rddQueue, true); - UnboundedDataset unboundedDataset = new UnboundedDataset( - inputDStream, Collections.singletonList(inputDStream.inputDStream().id())); - // add pre-baked Watermarks for the pre-baked batches. - Queue times = transform.getTimes(); - GlobalWatermarkHolder.addAll( - ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times)); - context.putDataset(transform, unboundedDataset); + private JavaInputDStream> buildInputStream( + Queue>> rddQueue, + CreateStream transform, + EvaluationContext context) { + return transform.isForceWatermarkSync() + ? new JavaInputDStream<>( + new WatermarkSyncedDStream<>( + rddQueue, transform.getBatchDuration(), context.getStreamingContext().ssc()), + JavaSparkContext$.MODULE$.>fakeClassTag()) + : context.getStreamingContext().queueStream(rddQueue, true); } @Override @@ -301,7 +331,8 @@ public final class StreamingTransformTranslator { wvCoder, windowingStrategy, context.getSerializableOptions(), - streamSources); + streamSources, + context.getCurrentTransform().getFullName()); context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources)); } http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java new file mode 100644 index 0000000..e2a7b44 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext$; +import org.apache.spark.rdd.RDD; +import org.apache.spark.streaming.StreamingContext; +import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.dstream.InputDStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link InputDStream} that keeps track of the {@link GlobalWatermarkHolder} status and only + * generates RDDs when they are in sync. If an RDD for time CURRENT_BATCH_TIME is + * requested, this input source will wait until the time of the batch which set the watermark has + * caught up and the following holds: + * + * {@code + * CURRENT_BATCH_TIME - TIME_OF_BATCH_WHICH_SET_THE_WATERMARK <= BATCH_DURATION + * } + * + *

In other words, this input source will stall and will NOT generate RDDs when the watermark is + * too far behind. Once the watermark has caught up with the current batch time, an RDD will be + * generated and emitted downstream. + * + *

NOTE: This input source is intended for test-use only, where one needs to be able to simulate + * non-trivial scenarios under a deterministic execution even at the cost incorporating test-only + * code. Unlike tests, in production InputDStreams will not be synchronous with the + * watermark, and the watermark is allowed to lag behind in a non-deterministic manner (since at + * this point in time we are reluctant to apply complex and possibly overly synchronous mechanisms + * at large scale). + * + *

See also BEAM-2671, BEAM-2789. + */ +class WatermarkSyncedDStream extends InputDStream> { + + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkSyncedDStream.class.getCanonicalName() + "#compute"); + + private static final int SLEEP_DURATION_MILLIS = 10; + + private final Queue>> rdds; + private final Long batchDuration; + private volatile boolean isFirst = true; + + public WatermarkSyncedDStream(final Queue>> rdds, + final Long batchDuration, + final StreamingContext ssc) { + super(ssc, JavaSparkContext$.MODULE$.>fakeClassTag()); + this.rdds = rdds; + this.batchDuration = batchDuration; + } + + private void awaitWatermarkSyncWith(final long batchTime) { + while (!isFirstBatch() && watermarkOutOfSync(batchTime)) { + Uninterruptibles.sleepUninterruptibly(SLEEP_DURATION_MILLIS, TimeUnit.MILLISECONDS); + } + + checkState( + isFirstBatch() || watermarkIsOneBatchBehind(batchTime), + String.format( + "Watermark batch time:[%d] should be exactly one batch behind current batch time:[%d]", + GlobalWatermarkHolder.getLastWatermarkedBatchTime(), batchTime)); + } + + private boolean watermarkOutOfSync(final long batchTime) { + return batchTime - GlobalWatermarkHolder.getLastWatermarkedBatchTime() > batchDuration; + } + + private boolean isFirstBatch() { + return isFirst; + } + + private RDD> generateRdd() { + return rdds.size() > 0 + ? rdds.poll().rdd() + : ssc().sparkContext().emptyRDD(JavaSparkContext$.MODULE$.>fakeClassTag()); + } + + private boolean watermarkIsOneBatchBehind(final long batchTime) { + return GlobalWatermarkHolder.getLastWatermarkedBatchTime() == batchTime - batchDuration; + } + + @Override + public scala.Option>> compute(final Time validTime) { + final long batchTime = validTime.milliseconds(); + + LOG.trace("BEFORE waiting for watermark sync, " + + "LastWatermarkedBatchTime: {}, current batch time: {}", + GlobalWatermarkHolder.getLastWatermarkedBatchTime(), + batchTime); + + final Stopwatch stopwatch = Stopwatch.createStarted(); + + awaitWatermarkSyncWith(batchTime); + + stopwatch.stop(); + + LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})", + stopwatch.elapsed(TimeUnit.MILLISECONDS), + batchTime); + + LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration)); + + LOG.trace("AFTER waiting for watermark sync, " + + "LastWatermarkedBatchTime: {}, current batch time: {}", + GlobalWatermarkHolder.getLastWatermarkedBatchTime(), + batchTime); + + final RDD> rdd = generateRdd(); + isFirst = false; + return scala.Option.apply(rdd); + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index 2cb6f26..8ad3ca4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -41,9 +41,12 @@ import org.apache.spark.storage.BlockManager; import org.apache.spark.storage.BlockResult; import org.apache.spark.storage.BlockStore; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaBatchInfo; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; /** @@ -53,11 +56,18 @@ import scala.Option; * and advances the watermarks according to the queue (first-in-first-out). */ public class GlobalWatermarkHolder { + + private static final Logger LOG = LoggerFactory.getLogger(GlobalWatermarkHolder.class); + private static final Map> sourceTimes = new HashMap<>(); private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS"); - private static volatile Map driverWatermarks = null; + // a local copy of the watermarks is stored on the driver node so that it can be + // accessed in test mode instead of fetching blocks remotely + private static volatile Map driverNodeWatermarks = null; + private static volatile LoadingCache> watermarkCache = null; + private static volatile long lastWatermarkedBatchTime = 0; public static void add(int sourceId, SparkWatermarks sparkWatermarks) { Queue timesQueue = sourceTimes.get(sourceId); @@ -79,18 +89,33 @@ public class GlobalWatermarkHolder { } } + public static long getLastWatermarkedBatchTime() { + return lastWatermarkedBatchTime; + } + /** * Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped * to their sources. */ @SuppressWarnings("unchecked") public static Map get(Long cacheInterval) { - if (driverWatermarks != null) { - // if we are executing in local mode simply return the local values. - return driverWatermarks; + if (canBypassRemoteWatermarkFetching()) { + /* + driverNodeWatermarks != null => + => advance() was called + => WatermarkAdvancingStreamingListener#onBatchCompleted() was called + => we are currently running on the driver node + => we can get the watermarks from the driver local copy instead of fetching their block + remotely using block manger + /------------------------------------------------------------------------------------------/ + In test mode, the system is running inside a single JVM, and thus both driver and executors + "canBypassWatermarkBlockFetching" by using the static driverNodeWatermarks copy. + This allows tests to avoid the asynchronous nature of using the BlockManager directly. + */ + return getLocalWatermarkCopy(); } else { if (watermarkCache == null) { - initWatermarkCache(cacheInterval); + watermarkCache = createWatermarkCache(cacheInterval); } try { return watermarkCache.get("SINGLETON"); @@ -100,103 +125,178 @@ public class GlobalWatermarkHolder { } } - private static synchronized void initWatermarkCache(Long batchDuration) { - if (watermarkCache == null) { - watermarkCache = - CacheBuilder.newBuilder() - // expire watermarks every half batch duration to ensure they update in every batch. - .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS) - .build(new WatermarksLoader()); - } + private static boolean canBypassRemoteWatermarkFetching() { + return driverNodeWatermarks != null; + } + + private static synchronized LoadingCache> + createWatermarkCache(final Long batchDuration) { + return CacheBuilder.newBuilder() + // expire watermarks every half batch duration to ensure they update in every batch. + .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS) + .build(new WatermarksLoader()); } /** * Advances the watermarks to the next-in-line watermarks. * SparkWatermarks are monotonically increasing. */ - @SuppressWarnings("unchecked") - public static void advance() { + public static void advance(final String batchId) { synchronized (GlobalWatermarkHolder.class) { - BlockManager blockManager = SparkEnv.get().blockManager(); + final BlockManager blockManager = SparkEnv.get().blockManager(); + final Map newWatermarks = computeNewWatermarks(blockManager); - if (sourceTimes.isEmpty()) { - return; + if (!newWatermarks.isEmpty()) { + writeRemoteWatermarkBlock(newWatermarks, blockManager); + writeLocalWatermarkCopy(newWatermarks); + } else { + LOG.info("No new watermarks could be computed upon completion of batch: {}", batchId); } + } + } + + private static void writeLocalWatermarkCopy(Map newWatermarks) { + driverNodeWatermarks = newWatermarks; + } - // update all sources' watermarks into the new broadcast. - Map newValues = new HashMap<>(); - - for (Map.Entry> en: sourceTimes.entrySet()) { - if (en.getValue().isEmpty()) { - continue; - } - Integer sourceId = en.getKey(); - Queue timesQueue = en.getValue(); - - // current state, if exists. - Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - Option currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID); - Map current; - if (currentOption.isDefined()) { - current = (Map) currentOption.get().data().next(); - } else { - current = Maps.newHashMap(); - blockManager.putSingle( - WATERMARKS_BLOCK_ID, - current, - StorageLevel.MEMORY_ONLY(), - true); - } - - if (current.containsKey(sourceId)) { - SparkWatermarks currentTimes = current.get(sourceId); - currentLowWatermark = currentTimes.getLowWatermark(); - currentHighWatermark = currentTimes.getHighWatermark(); - currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime(); - } - - SparkWatermarks next = timesQueue.poll(); - // advance watermarks monotonically. - Instant nextLowWatermark = next.getLowWatermark().isAfter(currentLowWatermark) - ? next.getLowWatermark() : currentLowWatermark; - Instant nextHighWatermark = next.getHighWatermark().isAfter(currentHighWatermark) - ? next.getHighWatermark() : currentHighWatermark; - Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime(); - checkState(!nextLowWatermark.isAfter(nextHighWatermark), - String.format( - "Low watermark %s cannot be later then high watermark %s", - nextLowWatermark, nextHighWatermark)); - checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime), - "Synchronized processing time must advance."); - newValues.put( - sourceId, - new SparkWatermarks( - nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime)); + private static Map getLocalWatermarkCopy() { + return driverNodeWatermarks; + } + + /** See {@link GlobalWatermarkHolder#advance(String)}. */ + public static void advance() { + advance("N/A"); + } + + /** + * Computes the next watermark values per source id. + * + * @return The new watermarks values or null if no source has reported its progress. + */ + private static Map computeNewWatermarks(BlockManager blockManager) { + + if (sourceTimes.isEmpty()) { + return new HashMap<>(); + } + + // update all sources' watermarks into the new broadcast. + final Map newValues = new HashMap<>(); + + for (final Map.Entry> watermarkInfo: sourceTimes.entrySet()) { + + if (watermarkInfo.getValue().isEmpty()) { + continue; } - // update the watermarks broadcast only if something has changed. - if (!newValues.isEmpty()) { - driverWatermarks = newValues; - blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); - blockManager.putSingle( - WATERMARKS_BLOCK_ID, - newValues, - StorageLevel.MEMORY_ONLY(), - true); + final Integer sourceId = watermarkInfo.getKey(); + + // current state, if exists. + Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + final Map currentWatermarks = initWatermarks(blockManager); + + if (currentWatermarks.containsKey(sourceId)) { + final SparkWatermarks currentTimes = currentWatermarks.get(sourceId); + currentLowWatermark = currentTimes.getLowWatermark(); + currentHighWatermark = currentTimes.getHighWatermark(); + currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime(); } + + final Queue timesQueue = watermarkInfo.getValue(); + final SparkWatermarks next = timesQueue.poll(); + + // advance watermarks monotonically. + + final Instant nextLowWatermark = + next.getLowWatermark().isAfter(currentLowWatermark) + ? next.getLowWatermark() + : currentLowWatermark; + + final Instant nextHighWatermark = + next.getHighWatermark().isAfter(currentHighWatermark) + ? next.getHighWatermark() + : currentHighWatermark; + + final Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime(); + + checkState( + !nextLowWatermark.isAfter(nextHighWatermark), + String.format( + "Low watermark %s cannot be later then high watermark %s", + nextLowWatermark, nextHighWatermark)); + + checkState( + nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime), + "Synchronized processing time must advance."); + + newValues.put( + sourceId, + new SparkWatermarks( + nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime)); + } + + return newValues; + } + + private static void writeRemoteWatermarkBlock( + final Map newWatermarks, final BlockManager blockManager) { + blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); + // if an executor tries to fetch the watermark block here, it will fail to do so since + // the watermark block has just been removed, but the new copy has not been put yet. + blockManager.putSingle(WATERMARKS_BLOCK_ID, newWatermarks, StorageLevel.MEMORY_ONLY(), true); + // if an executor tries to fetch the watermark block here, it still may fail to do so since + // the put operation might not have been executed yet + // see also https://issues.apache.org/jira/browse/BEAM-2789 + LOG.info("Put new watermark block: {}", newWatermarks); + } + + private static Map initWatermarks(final BlockManager blockManager) { + + final Map watermarks = fetchSparkWatermarks(blockManager); + + if (watermarks == null) { + final HashMap empty = Maps.newHashMap(); + blockManager.putSingle( + WATERMARKS_BLOCK_ID, + empty, + StorageLevel.MEMORY_ONLY(), + true); + return empty; + } else { + return watermarks; + } + } + + private static Map fetchSparkWatermarks(BlockManager blockManager) { + final Option blockResultOption = blockManager.getRemote(WATERMARKS_BLOCK_ID); + if (blockResultOption.isDefined()) { + return (Map) blockResultOption.get().data().next(); + } else { + return null; + } + } + + private static class WatermarksLoader extends CacheLoader> { + + @SuppressWarnings("unchecked") + @Override + public Map load(@Nonnull String key) throws Exception { + final BlockManager blockManager = SparkEnv.get().blockManager(); + final Map watermarks = fetchSparkWatermarks(blockManager); + return watermarks != null ? watermarks : Maps.newHashMap(); } } @VisibleForTesting public static synchronized void clear() { sourceTimes.clear(); - driverWatermarks = null; - SparkEnv sparkEnv = SparkEnv.get(); + lastWatermarkedBatchTime = 0; + writeLocalWatermarkCopy(null); + final SparkEnv sparkEnv = SparkEnv.get(); if (sparkEnv != null) { - BlockManager blockManager = sparkEnv.blockManager(); + final BlockManager blockManager = sparkEnv.blockManager(); blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); } } @@ -242,25 +342,33 @@ public class GlobalWatermarkHolder { } /** Advance the WMs onBatchCompleted event. */ - public static class WatermarksListener extends JavaStreamingListener { - @Override - public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { - GlobalWatermarkHolder.advance(); + public static class WatermarkAdvancingStreamingListener extends JavaStreamingListener { + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkAdvancingStreamingListener.class); + + private long timeOf(JavaBatchInfo info) { + return info.batchTime().milliseconds(); } - } - private static class WatermarksLoader extends CacheLoader> { + private long laterOf(long t1, long t2) { + return Math.max(t1, t2); + } - @SuppressWarnings("unchecked") @Override - public Map load(@Nonnull String key) throws Exception { - Option blockResultOption = - SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID); - if (blockResultOption.isDefined()) { - return (Map) blockResultOption.get().data().next(); - } else { - return Maps.newHashMap(); - } + public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { + + final long currentBatchTime = timeOf(batchCompleted.batchInfo()); + + GlobalWatermarkHolder.advance(Long.toString(currentBatchTime)); + + // make sure to update the last watermarked batch time AFTER the watermarks have already + // been updated (i.e., after the call to GlobalWatermarkHolder.advance(...)) + // in addition, the watermark's block in the BlockManager is updated in an asynchronous manner + lastWatermarkedBatchTime = + laterOf(lastWatermarkedBatchTime, currentBatchTime); + + LOG.info("Batch with timestamp: {} has completed, watermarks have been updated.", + lastWatermarkedBatchTime); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index cfbad01..a5455da 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -73,8 +73,10 @@ public class SparkPipelineStateTest implements Serializable { } private PTransform> getValues(final SparkPipelineOptions options) { + final boolean doNotSyncWithWatermark = false; return options.isStreaming() - ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1)).nextBatch("one", "two") + ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark) + .nextBatch("one", "two") : Create.of("one", "two"); } http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index 770e0c0..a432fda 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -163,16 +163,16 @@ public class CreateStreamTest implements Serializable { public void testDiscardingMode() throws IOException { CreateStream source = CreateStream.of(StringUtf8Coder.of(), batchDuration()) - .nextBatch( - TimestampedValue.of("firstPane", new Instant(100)), - TimestampedValue.of("alsoFirstPane", new Instant(200))) - .advanceWatermarkForNextBatch(new Instant(1001L)) - .nextBatch( - TimestampedValue.of("onTimePane", new Instant(500))) - .advanceNextBatchWatermarkToInfinity() - .nextBatch( - TimestampedValue.of("finalLatePane", new Instant(750)), - TimestampedValue.of("alsoFinalLatePane", new Instant(250))); + .nextBatch( + TimestampedValue.of("firstPane", new Instant(100)), + TimestampedValue.of("alsoFirstPane", new Instant(200))) + .advanceWatermarkForNextBatch(new Instant(1001L)) + .nextBatch( + TimestampedValue.of("onTimePane", new Instant(500))) + .advanceNextBatchWatermarkToInfinity() + .nextBatch( + TimestampedValue.of("finalLatePane", new Instant(750)), + TimestampedValue.of("alsoFinalLatePane", new Instant(250))); FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); Duration allowedLateness = Duration.millis(5000L); @@ -212,12 +212,13 @@ public class CreateStreamTest implements Serializable { Instant lateElementTimestamp = new Instant(-1_000_000); CreateStream source = CreateStream.of(StringUtf8Coder.of(), batchDuration()) - .emptyBatch() - .advanceWatermarkForNextBatch(new Instant(0)) - .nextBatch( - TimestampedValue.of("late", lateElementTimestamp), - TimestampedValue.of("onTime", new Instant(100))) - .advanceNextBatchWatermarkToInfinity(); + .emptyBatch() + .advanceWatermarkForNextBatch(new Instant(0)) + .emptyBatch() + .nextBatch( + TimestampedValue.of("late", lateElementTimestamp), + TimestampedValue.of("onTime", new Instant(100))) + .advanceNextBatchWatermarkToInfinity(); FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); Duration allowedLateness = Duration.millis(5000L); http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/resources/log4j.properties b/runners/spark/src/test/resources/log4j.properties index 66e83c8..010c7df 100644 --- a/runners/spark/src/test/resources/log4j.properties +++ b/runners/spark/src/test/resources/log4j.properties @@ -24,7 +24,16 @@ log4j.rootLogger=ERROR, testlogger log4j.appender.testlogger=org.apache.log4j.ConsoleAppender log4j.appender.testlogger.target = System.err log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.testlogger.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n # TestSparkRunner prints general information abut test pipelines execution. log4j.logger.org.apache.beam.runners.spark.TestSparkRunner=INFO + +# in case of an emergency - uncomment (or better yet, stay calm and uncomment). +#log4j.logger.org.apache.beam=TRACE +#log4j.logger.org.apache.beam.sdk.Pipeline=INFO +#log4j.logger.org.apache.beam.sdk.coders=INFO +#log4j.logger.org.apache.beam.sdk.runners.TransformHierarchy=ERROR +#log4j.logger.org.apache.beam.runners.spark.SparkRunner$Evaluator=ERROR +#log4j.logger.org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream#compute=INFO +#log4j.logger.org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream=ERROR