Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 09DF5200C46 for ; Wed, 29 Mar 2017 17:13:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 05258160B7C; Wed, 29 Mar 2017 15:13:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A212F160B95 for ; Wed, 29 Mar 2017 17:13:11 +0200 (CEST) Received: (qmail 29569 invoked by uid 500); 29 Mar 2017 15:13:10 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 29536 invoked by uid 99); 29 Mar 2017 15:13:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Mar 2017 15:13:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 66D4FDFBCA; Wed, 29 Mar 2017 15:13:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.apache.org Date: Wed, 29 Mar 2017 15:13:09 -0000 Message-Id: <54e78f1a733a4883b8f5ca9274aecbb3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-1827] Fix use of deprecated Spark APIs in the runner. archived-at: Wed, 29 Mar 2017 15:13:13 -0000 Repository: beam Updated Branches: refs/heads/master 99056df36 -> 8a33591d9 [BEAM-1827] Fix use of deprecated Spark APIs in the runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6671b5b6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6671b5b6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6671b5b6 Branch: refs/heads/master Commit: 6671b5b6bae6c2a918481577ca2564bb45e7c280 Parents: 99056df Author: Amit Sela Authored: Wed Mar 29 10:39:49 2017 +0300 Committer: Amit Sela Committed: Wed Mar 29 15:56:21 2017 +0300 ---------------------------------------------------------------------- .../beam/runners/spark/SparkPipelineResult.java | 51 +++---- .../apache/beam/runners/spark/SparkRunner.java | 153 +++++++++---------- .../beam/runners/spark/io/SourceDStream.java | 4 +- .../SparkRunnerStreamingContextFactory.java | 43 +++--- 4 files changed, 113 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index b2b2831..d2c5c8e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -38,9 +38,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; -/** - * Represents a Spark pipeline execution result. - */ +/** Represents a Spark pipeline execution result. */ public abstract class SparkPipelineResult implements PipelineResult { protected final Future pipelineExecution; @@ -48,8 +46,7 @@ public abstract class SparkPipelineResult implements PipelineResult { protected PipelineResult.State state; private final SparkMetricResults metricResults = new SparkMetricResults(); - SparkPipelineResult(final Future pipelineExecution, - final JavaSparkContext javaSparkContext) { + SparkPipelineResult(final Future pipelineExecution, final JavaSparkContext javaSparkContext) { this.pipelineExecution = pipelineExecution; this.javaSparkContext = javaSparkContext; // pipelineExecution is expected to have started executing eagerly. @@ -130,13 +127,10 @@ public abstract class SparkPipelineResult implements PipelineResult { return state; } - /** - * Represents the result of running a batch pipeline. - */ + /** Represents the result of running a batch pipeline. */ static class BatchMode extends SparkPipelineResult { - BatchMode(final Future pipelineExecution, - final JavaSparkContext javaSparkContext) { + BatchMode(final Future pipelineExecution, final JavaSparkContext javaSparkContext) { super(pipelineExecution, javaSparkContext); } @@ -156,15 +150,13 @@ public abstract class SparkPipelineResult implements PipelineResult { } } - /** - * Represents a streaming Spark pipeline result. - */ + /** Represents a streaming Spark pipeline result. */ static class StreamingMode extends SparkPipelineResult { private final JavaStreamingContext javaStreamingContext; - StreamingMode(final Future pipelineExecution, - final JavaStreamingContext javaStreamingContext) { + StreamingMode( + final Future pipelineExecution, final JavaStreamingContext javaStreamingContext) { super(pipelineExecution, javaStreamingContext.sparkContext()); this.javaStreamingContext = javaStreamingContext; } @@ -176,7 +168,7 @@ public abstract class SparkPipelineResult implements PipelineResult { // calling the StreamingContext's waiter with 0 msec will throw any error that might have // been thrown during the "grace period". try { - javaStreamingContext.awaitTermination(0); + javaStreamingContext.awaitTerminationOrTimeout(0); } catch (Exception e) { throw beamExceptionFrom(e); } finally { @@ -188,24 +180,24 @@ public abstract class SparkPipelineResult implements PipelineResult { } @Override - protected State awaitTermination(final Duration duration) throws ExecutionException, - InterruptedException { + protected State awaitTermination(final Duration duration) + throws ExecutionException, InterruptedException { pipelineExecution.get(); // execution is asynchronous anyway so no need to time-out. javaStreamingContext.awaitTerminationOrTimeout(duration.getMillis()); State terminationState; switch (javaStreamingContext.getState()) { - case ACTIVE: - terminationState = State.RUNNING; - break; - case STOPPED: - terminationState = State.DONE; - break; - default: - terminationState = State.UNKNOWN; - break; - } - return terminationState; + case ACTIVE: + terminationState = State.RUNNING; + break; + case STOPPED: + terminationState = State.DONE; + break; + default: + terminationState = State.UNKNOWN; + break; + } + return terminationState; } } @@ -216,5 +208,4 @@ public abstract class SparkPipelineResult implements PipelineResult { stop(); } } - } http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 97532c4..5b4f73e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -65,40 +65,32 @@ import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * The SparkRunner translate operations defined on a pipeline to a representation - * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run - * a Beam pipeline with the default options of a single threaded spark instance in local mode, - * we would do the following: + * The SparkRunner translate operations defined on a pipeline to a representation executable by + * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline + * with the default options of a single threaded spark instance in local mode, we would do the + * following: * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineResult result = (SparkPipelineResult) p.run(); - * } + *

{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result = + * (SparkPipelineResult) p.run(); } * *

To create a pipeline runner to run against a different spark cluster, with a custom master url * we would do the following: * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - * options.setSparkMaster("spark://host:port"); - * SparkPipelineResult result = (SparkPipelineResult) p.run(); - * } + *

{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options = + * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); } */ public final class SparkRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class); - /** - * Options used in this pipeline runner. - */ + /** Options used in this pipeline runner. */ private final SparkPipelineOptions mOptions; /** - * Creates and returns a new SparkRunner with default options. In particular, against a - * spark instance running in local mode. + * Creates and returns a new SparkRunner with default options. In particular, against a spark + * instance running in local mode. * * @return A pipeline runner with default options. */ @@ -156,11 +148,11 @@ public final class SparkRunner extends PipelineRunner { if (mOptions.isStreaming()) { CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir()); - final SparkRunnerStreamingContextFactory contextFactory = + SparkRunnerStreamingContextFactory streamingContextFactory = new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir); final JavaStreamingContext jssc = - JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), - contextFactory); + JavaStreamingContext.getOrCreate( + checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory); // Checkpoint aggregator/metrics values jssc.addStreamingListener( @@ -171,7 +163,7 @@ public final class SparkRunner extends PipelineRunner { new MetricsAccumulator.AccumulatorCheckpointingSparkListener())); // register user-defined listeners. - for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners()) { + for (JavaStreamingListener listener : mOptions.as(SparkContextOptions.class).getListeners()) { LOG.info("Registered listener {}." + listener.getClass().getSimpleName()); jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener)); } @@ -185,14 +177,16 @@ public final class SparkRunner extends PipelineRunner { // but this is fine since it is idempotent). initAccumulators(mOptions, jssc.sparkContext()); - startPipeline = executorService.submit(new Runnable() { + startPipeline = + executorService.submit( + new Runnable() { - @Override - public void run() { - LOG.info("Starting streaming pipeline execution."); - jssc.start(); - } - }); + @Override + public void run() { + LOG.info("Starting streaming pipeline execution."); + jssc.start(); + } + }); result = new SparkPipelineResult.StreamingMode(startPipeline, jssc); } else { @@ -206,15 +200,17 @@ public final class SparkRunner extends PipelineRunner { initAccumulators(mOptions, jsc); - startPipeline = executorService.submit(new Runnable() { + startPipeline = + executorService.submit( + new Runnable() { - @Override - public void run() { - pipeline.traverseTopologically(new Evaluator(translator, evaluationContext)); - evaluationContext.computeOutputs(); - LOG.info("Batch pipeline execution complete."); - } - }); + @Override + public void run() { + pipeline.traverseTopologically(new Evaluator(translator, evaluationContext)); + evaluationContext.computeOutputs(); + LOG.info("Batch pipeline execution complete."); + } + }); result = new SparkPipelineResult.BatchMode(startPipeline, jsc); } @@ -227,30 +223,28 @@ public final class SparkRunner extends PipelineRunner { } private void registerMetricsSource(String appName) { - final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); - final AggregatorMetricSource aggregatorMetricSource = - new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value()); - final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null); - final CompositeSource compositeSource = - new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(), - aggregatorMetricSource.metricRegistry()); - // re-register the metrics in case of context re-use - metricsSystem.removeSource(compositeSource); - metricsSystem.registerSource(compositeSource); + final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); + final AggregatorMetricSource aggregatorMetricSource = + new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value()); + final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null); + final CompositeSource compositeSource = + new CompositeSource( + appName + ".Beam", + metricsSource.metricRegistry(), + aggregatorMetricSource.metricRegistry()); + // re-register the metrics in case of context re-use + metricsSystem.removeSource(compositeSource); + metricsSystem.registerSource(compositeSource); } - /** - * Init Metrics/Aggregators accumulators. This method is idempotent. - */ + /** Init Metrics/Aggregators accumulators. This method is idempotent. */ public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) { // Init metrics accumulators MetricsAccumulator.init(opts, jsc); AggregatorsAccumulator.init(opts, jsc); } - /** - * Visit the pipeline to determine the translation mode (batch/streaming). - */ + /** Visit the pipeline to determine the translation mode (batch/streaming). */ private void detectTranslationMode(Pipeline pipeline) { TranslationModeDetector detector = new TranslationModeDetector(); pipeline.traverseTopologically(detector); @@ -260,20 +254,14 @@ public final class SparkRunner extends PipelineRunner { } } - /** - * Evaluator that update/populate the cache candidates. - */ + /** Evaluator that update/populate the cache candidates. */ public static void updateCacheCandidates( - Pipeline pipeline, - SparkPipelineTranslator translator, - EvaluationContext evaluationContext) { - CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext); - pipeline.traverseTopologically(cacheVisitor); + Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) { + CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext); + pipeline.traverseTopologically(cacheVisitor); } - /** - * The translation mode of the Beam Pipeline. - */ + /** The translation mode of the Beam Pipeline. */ enum TranslationMode { /** Uses the batch mode. */ BATCH, @@ -281,9 +269,7 @@ public final class SparkRunner extends PipelineRunner { STREAMING } - /** - * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. - */ + /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */ private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class); private static final Collection> UNBOUNDED_INPUTS = @@ -315,14 +301,11 @@ public final class SparkRunner extends PipelineRunner { } } - /** - * Traverses the pipeline to populate the candidates for caching. - */ + /** Traverses the pipeline to populate the candidates for caching. */ static class CacheVisitor extends Evaluator { protected CacheVisitor( - SparkPipelineTranslator translator, - EvaluationContext evaluationContext) { + SparkPipelineTranslator translator, EvaluationContext evaluationContext) { super(translator, evaluationContext); } @@ -345,9 +328,7 @@ public final class SparkRunner extends PipelineRunner { } } - /** - * Evaluator on the pipeline. - */ + /** Evaluator on the pipeline. */ @SuppressWarnings("WeakerAccess") public static class Evaluator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); @@ -399,7 +380,9 @@ public final class SparkRunner extends PipelineRunner { } // defer if sideInputs are defined. if (hasSideInput) { - LOG.info("Deferring combine transformation {} for job {}", transform, + LOG.info( + "Deferring combine transformation {} for job {}", + transform, ctxt.getPipeline().getOptions().getJobName()); return true; } @@ -412,14 +395,14 @@ public final class SparkRunner extends PipelineRunner { doVisitTransform(node); } - > void - doVisitTransform(TransformHierarchy.Node node) { + > void doVisitTransform( + TransformHierarchy.Node node) { @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") Class transformClass = (Class) (Class) transform.getClass(); - @SuppressWarnings("unchecked") TransformEvaluator evaluator = - translate(node, transform, transformClass); + @SuppressWarnings("unchecked") + TransformEvaluator evaluator = translate(node, transform, transformClass); LOG.info("Evaluating {}", transform); AppliedPTransform appliedTransform = node.toAppliedPTransform(); ctxt.setCurrentTransform(appliedTransform); @@ -432,7 +415,7 @@ public final class SparkRunner extends PipelineRunner { * translate with the proper translator. */ protected > - TransformEvaluator translate( + TransformEvaluator translate( TransformHierarchy.Node node, TransformT transform, Class transformClass) { //--- determine if node is bounded/unbounded. // usually, the input determines if the PCollection to apply the next transformation to @@ -449,7 +432,7 @@ public final class SparkRunner extends PipelineRunner { LOG.debug("Translating {} as {}", transform, isNodeBounded); return isNodeBounded.equals(PCollection.IsBounded.BOUNDED) ? translator.translateBounded(transformClass) - : translator.translateUnbounded(transformClass); + : translator.translateUnbounded(transformClass); } protected PCollection.IsBounded isBoundedCollection(Collection pValues) { @@ -458,7 +441,7 @@ public final class SparkRunner extends PipelineRunner { // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED // while BOUNDED + UNBOUNDED = UNBOUNDED. PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED; - for (TaggedPValue pValue: pValues) { + for (TaggedPValue pValue : pValues) { if (pValue.getValue() instanceof PCollection) { isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded()); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 3f2c10a..b7bfeed 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -87,7 +87,7 @@ class SourceDStream this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), options.getMinReadTimeMillis()); // set initial parallelism once. - this.initialParallelism = ssc().sc().defaultParallelism(); + this.initialParallelism = ssc().sparkContext().defaultParallelism(); checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero."); this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords(); @@ -106,7 +106,7 @@ class SourceDStream public scala.Option, CheckpointMarkT>>> compute(Time validTime) { RDD, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>( - ssc().sc(), + ssc().sparkContext(), runtimeContext, createMicrobatchSource(), numPartitions); http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 98521e9..2dd18f3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -32,47 +32,47 @@ import org.apache.beam.sdk.Pipeline; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function0; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A {@link JavaStreamingContext} factory for resilience. - * @see how-to-configure-checkpointing + * + * @see how-to-configure-checkpointing */ -public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { +public class SparkRunnerStreamingContextFactory implements Function0 { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private final Pipeline pipeline; - private final SparkPipelineOptions options; - private final CheckpointDir checkpointDir; + // set members as transient to satisfy findbugs and since this only runs in driver. + private final transient Pipeline pipeline; + private final transient SparkPipelineOptions options; + private final transient CheckpointDir checkpointDir; public SparkRunnerStreamingContextFactory( - Pipeline pipeline, - SparkPipelineOptions options, - CheckpointDir checkpointDir) { + Pipeline pipeline, SparkPipelineOptions options, CheckpointDir checkpointDir) { this.pipeline = pipeline; this.options = options; this.checkpointDir = checkpointDir; } - private EvaluationContext ctxt; - @Override - public JavaStreamingContext create() { + public JavaStreamingContext call() throws Exception { LOG.info("Creating a new Spark Streaming Context"); // validate unbounded read properties. - checkArgument(options.getMinReadTimeMillis() < options.getBatchIntervalMillis(), + checkArgument( + options.getMinReadTimeMillis() < options.getBatchIntervalMillis(), "Minimum read time has to be less than batch time."); - checkArgument(options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1, + checkArgument( + options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1, "Read time percentage is bound to (0, 1)."); - SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator( - new TransformTranslator.Translator()); + SparkPipelineTranslator translator = + new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); Duration batchDuration = new Duration(options.getBatchIntervalMillis()); LOG.info("Setting Spark streaming batchDuration to {} msec", batchDuration.milliseconds()); @@ -82,24 +82,25 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF // We must first init accumulators since translators expect them to be instantiated. SparkRunner.initAccumulators(options, jsc); - ctxt = new EvaluationContext(jsc, pipeline, jssc); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); // update cache candidates SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); - checkpoint(jssc); + checkpoint(jssc, checkpointDir); return jssc; } - private void checkpoint(JavaStreamingContext jssc) { + private void checkpoint(JavaStreamingContext jssc, CheckpointDir checkpointDir) { Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir(); Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); try { - FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration()); + FileSystem fileSystem = + rootCheckpointPath.getFileSystem(jssc.sparkContext().hadoopConfiguration()); if (!fileSystem.exists(rootCheckpointPath)) { fileSystem.mkdirs(rootCheckpointPath); }