beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [12/23] beam git commit: Make TimestampTransform Serializable.
Date Tue, 28 Feb 2017 22:35:19 GMT
Make TimestampTransform Serializable.

Rebase leftover fixes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f47e0eba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f47e0eba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f47e0eba

Branch: refs/heads/master
Commit: f47e0ebacd4c915e2a8c8b4e18e792926745d673
Parents: 24ab605
Author: Sela <ansela@paypal.com>
Authored: Mon Feb 20 19:45:03 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Wed Mar 1 00:18:04 2017 +0200

----------------------------------------------------------------------
 .../aggregators/AggregatorsAccumulator.java     |  2 +-
 .../spark/metrics/MetricsAccumulator.java       |  4 --
 .../spark/metrics/SparkMetricsContainer.java    |  5 +-
 .../spark/stateful/SparkStateInternals.java     | 16 +++++++
 .../streaming/StreamingTransformTranslator.java |  2 +-
 .../translation/streaming/UnboundedDataset.java |  1 -
 .../ResumeFromCheckpointStreamingTest.java      | 48 ++++++++++++++++++--
 .../windowing/TimestampTransform.java           |  3 +-
 8 files changed, 64 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
index a4dfda6..261c327 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton.
+ * For resilience, {@link Accumulator Accumulators} are required to be wrapped in a Singleton.
  * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a>
  */
 public class AggregatorsAccumulator {

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index f27a826..9d48289 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -53,9 +53,6 @@ public class MetricsAccumulator {
     if (instance == null) {
       synchronized (MetricsAccumulator.class) {
         if (instance == null) {
-          // TODO: currently when recovering from checkpoint, Spark does not recover the
-          // last known Accumulator value. The SparkRunner should be able to persist and
recover
-          // the SparkMetricsContainer in order to recover metrics as well.
           SparkMetricsContainer initialValue = new SparkMetricsContainer();
           instance = jsc.sc().accumulator(initialValue, "Beam.Metrics",
               new MetricsAccumulatorParam());
@@ -94,7 +91,6 @@ public class MetricsAccumulator {
     }
   }
 
-  @SuppressWarnings("unused")
   @VisibleForTesting
   static void clear() {
     synchronized (MetricsAccumulator.class) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index 0bf0e70..7a4b222 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -145,10 +145,7 @@ public class SparkMetricsContainer implements Serializable {
   @VisibleForTesting
   public static void clear() {
     try {
-      SparkMetricsContainer instance = getInstance();
-      instance.initializeMetricsContainers();
-      instance.counters.clear();
-      instance.distributions.clear();
+      MetricsAccumulator.clear();
     } catch (IllegalStateException ignored) {
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 93b1f63..43fb383 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -38,7 +38,9 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
@@ -121,6 +123,20 @@ class SparkStateInternals<K> implements StateInternals<K>
{
     }
 
     @Override
+    public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>>
spec, Coder<T> elemCoder) {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported", SetState.class.getSimpleName()));
+    }
+
+    @Override
+    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported", MapState.class.getSimpleName()));
+    }
+
+    @Override
     public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
             StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
address,

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index a98eff2..fc98781 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -37,9 +37,9 @@ import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource;
-import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
 import org.apache.beam.runners.spark.translation.BoundedDataset;
 import org.apache.beam.runners.spark.translation.Dataset;
 import org.apache.beam.runners.spark.translation.DoFnFunction;

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index d7f3f34..e9abe93 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.spark.translation.Dataset;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index e307363..4eea383 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -17,15 +17,19 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
+import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -33,17 +37,23 @@ import org.apache.beam.runners.spark.ReuseSparkContextRule;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Keys;
@@ -52,6 +62,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -59,6 +70,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -147,6 +159,11 @@ public class ResumeFromCheckpointStreamingTest {
         "k4", new Instant(400)
     ));
 
+    MetricsFilter metricsFilter =
+        MetricsFilter.builder()
+            .addNameFilter(MetricNameFilter.inNamespace(ResumeFromCheckpointStreamingTest.class))
+            .build();
+
     // first run will read from Kafka backlog - "auto.offset.reset=smallest"
     SparkPipelineResult res = run(options);
     res.waitUntilFinish(Duration.standardSeconds(2));
@@ -157,11 +174,15 @@ public class ResumeFromCheckpointStreamingTest {
             "Expected %d processed messages count but found %d", 4, processedMessages1),
         processedMessages1,
         equalTo(4L));
+    assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
+        hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
+            "allMessages", "EOFShallNotPassFn", 4L)));
 
     //--- between executions:
 
     //- clear state.
-    AccumulatorSingleton.clear();
+    AggregatorsAccumulator.clear();
+    SparkMetricsContainer.clear();
     GlobalWatermarkHolder.clear();
 
     //- write a bit more.
@@ -175,11 +196,14 @@ public class ResumeFromCheckpointStreamingTest {
     res.waitUntilFinish(Duration.standardSeconds(2));
     // assertions 2:
     long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class);
-    int successAssertions = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
     assertThat(
-        String.format("Expected %d processed messages count but found %d", 1, processedMessages2),
+        String.format("Expected %d processed messages count but found %d", 5, processedMessages2),
         processedMessages2,
         equalTo(5L));
+    assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
+        hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
+            "allMessages", "EOFShallNotPassFn", 6L)));
+    int successAssertions = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
     res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
     assertThat(
         String.format(
@@ -225,10 +249,14 @@ public class ResumeFromCheckpointStreamingTest {
 
     Pipeline p = Pipeline.create(options);
 
+    PCollection<String> expectedCol =
+        p.apply(Create.of(ImmutableList.of("side1", "side2")).withCoder(StringUtf8Coder.of()));
+    PCollectionView<List<String>> view = expectedCol.apply(View.<String>asList());
+
     PCollection<Iterable<String>> grouped = p
         .apply(read.withoutMetadata())
         .apply(Keys.<String>create())
-        .apply(ParDo.of(new EOFShallNotPassFn()))
+        .apply("EOFShallNotPassFn", ParDo.of(new EOFShallNotPassFn(view)).withSideInputs(view))
         .apply(Window.<String>into(FixedWindows.of(Duration.millis(500)))
             .triggering(AfterWatermark.pastEndOfWindow())
                 .accumulatingFiredPanes()
@@ -250,12 +278,22 @@ public class ResumeFromCheckpointStreamingTest {
 
   /** A pass-through fn that prevents EOF event from passing. */
   private static class EOFShallNotPassFn extends DoFn<String, String> {
+    final PCollectionView<List<String>> view;
     private final Aggregator<Long, Long> aggregator =
         createAggregator("processedMessages", Sum.ofLongs());
+    Counter counter =
+        Metrics.counter(ResumeFromCheckpointStreamingTest.class, "allMessages");
+
+    private EOFShallNotPassFn(PCollectionView<List<String>> view) {
+      this.view = view;
+    }
 
     @ProcessElement
     public void process(ProcessContext c) {
       String element = c.element();
+      // assert that side input is passed correctly before/after resuming from checkpoint.
+      assertThat(c.sideInput(view), containsInAnyOrder("side1", "side2"));
+      counter.inc();
       if (!element.equals("EOF")) {
         aggregator.addValue(1L);
         c.output(c.element());

http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
index b16e968..5318592 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
@@ -18,11 +18,12 @@
 package org.apache.beam.sdk.transforms.windowing;
 
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** An abstract description of a standardized transformation on timestamps. */
-public abstract class TimestampTransform {
+public abstract class TimestampTransform implements Serializable{
 
   /** Returns a transform that shifts a timestamp later by {@code delay}. */
   public static TimestampTransform delay(Duration delay) {


Mime
View raw message