Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B95DB200B7D for ; Sat, 10 Sep 2016 16:41:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8335160ADC; Sat, 10 Sep 2016 14:41:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E5097160AD2 for ; Sat, 10 Sep 2016 16:41:11 +0200 (CEST) Received: (qmail 97072 invoked by uid 500); 10 Sep 2016 14:41:11 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 96995 invoked by uid 99); 10 Sep 2016 14:41:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2016 14:41:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D248EEE1B; Sat, 10 Sep 2016 14:41:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: reta@apache.org To: commits@cxf.apache.org Date: Sat, 10 Sep 2016 14:41:18 -0000 Message-Id: <6e7a9ac8eed64423965632816abc77ec@git.apache.org> In-Reply-To: <2379b058510a4065acefc49ed97152b0@git.apache.org> References: <2379b058510a4065acefc49ed97152b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/37] cxf git commit: [CXF-6618] Simplifying the sync code archived-at: Sat, 10 Sep 2016 14:41:12 -0000 [CXF-6618] Simplifying the sync code Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/203b5433 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/203b5433 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/203b5433 Branch: refs/heads/master-jaxrs-2.1 Commit: 203b5433bc44aa831bdd9b1dd4ab474d192dea6c Parents: e6d84cf Author: Sergey Beryozkin Authored: Tue Sep 6 13:55:53 2016 +0100 Committer: Sergey Beryozkin Committed: Tue Sep 6 13:55:53 2016 +0100 ---------------------------------------------------------------------- .../jaxrs/server/SparkStreamingListener.java | 5 +--- .../demo/jaxrs/server/SparkStreamingOutput.java | 28 +++++++------------- .../demo/jaxrs/server/StreamingService.java | 5 ++-- 3 files changed, 13 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java index 3ee5558..1881857 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java @@ -19,10 +19,6 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onBatchCompleted(StreamingListenerBatchCompleted event) { - // as soon as the batch is finished we let the streaming context go - // but this may need to be revisited if a given InputStream happens to be processed in - // multiple batches ? - sparkStreamingOutput.setBatchCompleted(); } @Override @@ -35,6 +31,7 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) { + sparkStreamingOutput.setOperationCompleted(); } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index e3ac218..e28bb5a 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -33,8 +33,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class SparkStreamingOutput implements StreamingOutput { private JavaPairDStream wordCounts; private JavaStreamingContext jssc; - private boolean sparkDone; - private boolean batchCompleted; + private boolean operationCompleted; public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream wordCounts) { this.jssc = jssc; this.wordCounts = wordCounts; @@ -44,13 +43,13 @@ public class SparkStreamingOutput implements StreamingOutput { public void write(final OutputStream output) throws IOException, WebApplicationException { wordCounts.foreachRDD(new OutputFunction(output)); jssc.start(); - awaitTermination(); + waitForOperationCompleted(); jssc.stop(false); jssc.close(); } - private synchronized void awaitTermination() { - while (!sparkDone) { + private synchronized void waitForOperationCompleted() { + while (!operationCompleted) { try { wait(); } catch (InterruptedException e) { @@ -58,18 +57,14 @@ public class SparkStreamingOutput implements StreamingOutput { } } } - private synchronized void releaseStreamingContext() { - if (batchCompleted) { - sparkDone = true; - notify(); - } - } - - public synchronized void setBatchCompleted() { - batchCompleted = true; - } + public synchronized void setOperationCompleted() { + this.operationCompleted = true; + notify(); + } + + // This dedicated class was introduced to validate that when Spark is running it does not // fail the processing due to OutputStream being one of the fields in the serializable class, private class OutputFunction implements VoidFunction> { @@ -89,9 +84,6 @@ public class SparkStreamingOutput implements StreamingOutput { throw new WebApplicationException(); } } - // Right now we assume by the time we call it the whole InputStream has been - // processed - releaseStreamingContext(); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 19d1dac..7d14e9e 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -52,9 +52,9 @@ import scala.Tuple2; public class StreamingService { private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(10)); - private SparkConf sparkConf; + private JavaStreamingContext jssc; public StreamingService(SparkConf sparkConf) { - this.sparkConf = sparkConf; + jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); } @POST @@ -63,7 +63,6 @@ public class StreamingService { @Produces("text/plain") public void getStream(@Suspended AsyncResponse async, InputStream is) { try { - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream receiverStream = jssc.receiverStream(new InputStreamReceiver(is)); SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc,