beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-648] Persist and restore Aggergator values in case of recovery from failure
Date Mon, 30 Jan 2017 20:58:59 GMT
Repository: beam
Updated Branches:
  refs/heads/master 343176c00 -> 847e4e9f0


[BEAM-648] Persist and restore Aggergator values in case of recovery from failure

Added javadoc and minor refactor

Moved creation of beam checkpoint dir


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62f9e7b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62f9e7b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62f9e7b1

Branch: refs/heads/master
Commit: 62f9e7b1e1a8a8f2317e3508ccce615f2b30d4f6
Parents: 343176c
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Sun Jan 22 14:30:44 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Mon Jan 30 22:53:34 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 21 ++++-
 .../spark/aggregators/AccumulatorSingleton.java | 96 ++++++++++++++++++--
 .../spark/aggregators/SparkAggregators.java     | 20 +++-
 .../translation/streaming/CheckpointDir.java    | 69 ++++++++++++++
 .../SparkRunnerStreamingContextFactory.java     | 44 ++++++---
 .../ResumeFromCheckpointStreamingTest.java      |  5 +-
 6 files changed, 230 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 92c07bb..578ed21 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,12 +18,14 @@
 
 package org.apache.beam.runners.spark;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
@@ -32,6 +34,7 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
 import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
@@ -54,6 +57,7 @@ import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.metrics.MetricsSystem;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,7 +134,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
   }
 
   private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc)
{
-    final Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(jsc);
+    Optional<CheckpointDir> maybeCheckpointDir =
+        opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
+            : Optional.<CheckpointDir>absent();
+    final Accumulator<NamedAggregators> accum =
+        SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir);
     final NamedAggregators initialValue = accum.value();
 
     if (opts.getEnableSparkMetricSinks()) {
@@ -154,10 +162,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
     detectTranslationMode(pipeline);
 
     if (mOptions.isStreaming()) {
+      CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
       final SparkRunnerStreamingContextFactory contextFactory =
-          new SparkRunnerStreamingContextFactory(pipeline, mOptions);
+          new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir);
       final JavaStreamingContext jssc =
-          JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory);
+          JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
+              contextFactory);
+
+      // Checkpoint aggregator values
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(
+              new AccumulatorSingleton.AccumulatorCheckpointingSparkListener()));
 
       startPipeline = executorService.submit(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
index 883830e..473750c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
@@ -19,35 +19,119 @@
 package org.apache.beam.runners.spark.aggregators;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton.
  * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a>
  */
-class AccumulatorSingleton {
+public class AccumulatorSingleton {
+  private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class);
+
+  private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators";
 
-  private static volatile Accumulator<NamedAggregators> instance = null;
+  private static volatile Accumulator<NamedAggregators> instance;
+  private static volatile FileSystem fileSystem;
+  private static volatile Path checkpointPath;
+  private static volatile Path tempCheckpointPath;
+  private static volatile Path backupCheckpointPath;
 
-  static Accumulator<NamedAggregators> getInstance(JavaSparkContext jsc) {
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  static Accumulator<NamedAggregators> getInstance(
+      JavaSparkContext jsc,
+      Optional<CheckpointDir> checkpointDir) {
     if (instance == null) {
       synchronized (AccumulatorSingleton.class) {
         if (instance == null) {
-          //TODO: currently when recovering from checkpoint, Spark does not recover the
-          // last known Accumulator value. The SparkRunner should be able to persist and
recover
-          // the NamedAggregators in order to recover Aggregators as well.
           instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam());
+          if (checkpointDir.isPresent()) {
+            recoverValueFromCheckpoint(jsc, checkpointDir.get());
+          }
         }
       }
     }
     return instance;
   }
 
+  private static void recoverValueFromCheckpoint(
+      JavaSparkContext jsc,
+      CheckpointDir checkpointDir) {
+    FSDataInputStream is = null;
+    try {
+      Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
+      checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME);
+      tempCheckpointPath = checkpointPath.suffix(".tmp");
+      backupCheckpointPath = checkpointPath.suffix(".bak");
+      fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration());
+      if (fileSystem.exists(checkpointPath)) {
+        is = fileSystem.open(checkpointPath);
+      } else if (fileSystem.exists(backupCheckpointPath)) {
+        is = fileSystem.open(backupCheckpointPath);
+      }
+      if (is != null) {
+        ObjectInputStream objectInputStream = new ObjectInputStream(is);
+        NamedAggregators recoveredValue =
+            (NamedAggregators) objectInputStream.readObject();
+        objectInputStream.close();
+        LOG.info("Recovered accumulators from checkpoint: " + recoveredValue);
+        instance.setValue(recoveredValue);
+      } else {
+        LOG.info("No accumulator checkpoint found.");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while reading accumulator checkpoint.", e);
+    }
+  }
+
+  private static void checkpoint() throws IOException {
+    if (checkpointPath != null) {
+      if (fileSystem.exists(checkpointPath)) {
+        if (fileSystem.exists(backupCheckpointPath)) {
+          fileSystem.delete(backupCheckpointPath, false);
+        }
+        fileSystem.rename(checkpointPath, backupCheckpointPath);
+      }
+      FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true);
+      ObjectOutputStream oos = new ObjectOutputStream(os);
+      oos.writeObject(instance.value());
+      oos.close();
+      fileSystem.rename(tempCheckpointPath, checkpointPath);
+    }
+  }
+
   @VisibleForTesting
   static void clear() {
     synchronized (AccumulatorSingleton.class) {
       instance = null;
     }
   }
+
+  /**
+   * Spark Listener which checkpoints {@link NamedAggregators} values for fault-tolerance.
+   */
+  public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener
{
+    @Override
+    public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+      try {
+        checkpoint();
+      } catch (IOException e) {
+        LOG.error("Failed to checkpoint accumulator singleton.", e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index fa5c8d1..245c69e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -18,12 +18,14 @@
 
 package org.apache.beam.runners.spark.aggregators;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
@@ -66,10 +68,24 @@ public class SparkAggregators {
    *
    * @param jsc a Spark context to be used in order to retrieve the name
    * {@link NamedAggregators} instance
-   * @return a {@link NamedAggregators} instance
    */
   public static Accumulator<NamedAggregators> getNamedAggregators(JavaSparkContext
jsc) {
-    return AccumulatorSingleton.getInstance(jsc);
+    return getOrCreateNamedAggregators(jsc, Optional.<CheckpointDir>absent());
+  }
+
+  /**
+   * Retrieves or creates the {@link NamedAggregators} instance using the provided Spark
context.
+   *
+   * @param jsc a Spark context to be used in order to retrieve the name
+   * {@link NamedAggregators} instance
+   * @param checkpointDir checkpoint dir (optional, for streaming pipelines)
+   * @return a {@link NamedAggregators} instance
+   */
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  public static Accumulator<NamedAggregators> getOrCreateNamedAggregators(
+      JavaSparkContext jsc,
+      Optional<CheckpointDir> checkpointDir) {
+    return AccumulatorSingleton.getInstance(jsc, checkpointDir);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
new file mode 100644
index 0000000..5b192bd
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Spark checkpoint dir tree.
+ *
+ * {@link SparkPipelineOptions} checkpointDir is used as a root directory under which one
directory
+ * is created for Spark's checkpoint and another for Beam's Spark runner's fault tolerance
needs.
+ * Spark's checkpoint relies on Hadoop's {@link org.apache.hadoop.fs.FileSystem} and is used
for
+ * Beam as well rather than {@link org.apache.beam.sdk.io.FileSystem} to be consistent with
Spark.
+ */
+public class CheckpointDir {
+  private static final Logger LOG = LoggerFactory.getLogger(CheckpointDir.class);
+
+  private static final String SPARK_CHECKPOINT_DIR = "spark-checkpoint";
+  private static final String BEAM_CHECKPOINT_DIR = "beam-checkpoint";
+  private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
+
+  private final Path rootCheckpointDir;
+  private final Path sparkCheckpointDir;
+  private final Path beamCheckpointDir;
+
+  public CheckpointDir(String rootCheckpointDir) {
+    if (!rootCheckpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
+      LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in
case "
+          + "of failures this job may not recover properly or even at all.", rootCheckpointDir);
+    }
+    LOG.info("Checkpoint dir set to: {}", rootCheckpointDir);
+
+    this.rootCheckpointDir = new Path(rootCheckpointDir);
+    this.sparkCheckpointDir = new Path(rootCheckpointDir, SPARK_CHECKPOINT_DIR);
+    this.beamCheckpointDir = new Path(rootCheckpointDir, BEAM_CHECKPOINT_DIR);
+  }
+
+  public Path getRootCheckpointDir() {
+    return rootCheckpointDir;
+  }
+
+  public Path getSparkCheckpointDir() {
+    return sparkCheckpointDir;
+  }
+
+  public Path getBeamCheckpointDir() {
+    return beamCheckpointDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index d069a11..6d254e1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.IOException;
 import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
@@ -28,6 +29,8 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -45,14 +48,18 @@ import org.slf4j.LoggerFactory;
 public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
   private static final Logger LOG =
       LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
-  private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
 
   private final Pipeline pipeline;
   private final SparkPipelineOptions options;
+  private final CheckpointDir checkpointDir;
 
-  public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options)
{
+  public SparkRunnerStreamingContextFactory(
+      Pipeline pipeline,
+      SparkPipelineOptions options,
+      CheckpointDir checkpointDir) {
     this.pipeline = pipeline;
     this.options = options;
+    this.checkpointDir = checkpointDir;
   }
 
   private EvaluationContext ctxt;
@@ -73,18 +80,12 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
 
     JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
     JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+
     ctxt = new EvaluationContext(jsc, pipeline, jssc);
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
     ctxt.computeOutputs();
 
-    // set checkpoint dir.
-    String checkpointDir = options.getCheckpointDir();
-    if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
-      LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in
case "
-          + "of failures this job may not recover properly or even at all.", checkpointDir);
-    }
-    LOG.info("Checkpoint dir set to: {}", checkpointDir);
-    jssc.checkpoint(checkpointDir);
+    checkpoint(jssc);
 
     // register listeners.
     for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners())
{
@@ -95,7 +96,26 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     return jssc;
   }
 
-  public EvaluationContext getCtxt() {
-    return ctxt;
+  private void checkpoint(JavaStreamingContext jssc) {
+    Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
+    Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();
+    Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
+
+    try {
+      FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration());
+      if (!fileSystem.exists(rootCheckpointPath)) {
+        fileSystem.mkdirs(rootCheckpointPath);
+      }
+      if (!fileSystem.exists(sparkCheckpointPath)) {
+        fileSystem.mkdirs(sparkCheckpointPath);
+      }
+      if (!fileSystem.exists(beamCheckpointPath)) {
+        fileSystem.mkdirs(beamCheckpointPath);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create checkpoint dir", e);
+    }
+
+    jssc.checkpoint(sparkCheckpointPath.toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7346bd9..8280672 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -81,6 +81,7 @@ public class ResumeFromCheckpointStreamingTest {
   );
   private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
   private static final long EXPECTED_AGG_FIRST = 4L;
+  private static final long EXPECTED_AGG_SECOND = 8L;
 
   @Rule
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@@ -141,8 +142,8 @@ public class ResumeFromCheckpointStreamingTest {
     res = runAgain(options);
     long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class);
     assertThat(String.format("Expected %d processed messages count but "
-        + "found %d", EXPECTED_AGG_FIRST, processedMessages2), processedMessages2,
-            equalTo(EXPECTED_AGG_FIRST));
+        + "found %d", EXPECTED_AGG_SECOND, processedMessages2), processedMessages2,
+            equalTo(EXPECTED_AGG_SECOND));
   }
 
   private SparkPipelineResult runAgain(SparkPipelineOptions options) {


Mime
View raw message