Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CF11D200B90 for ; Sat, 10 Sep 2016 16:41:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CD528160ABE; Sat, 10 Sep 2016 14:41:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF661160ADA for ; Sat, 10 Sep 2016 16:41:12 +0200 (CEST) Received: (qmail 97690 invoked by uid 500); 10 Sep 2016 14:41:11 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 97086 invoked by uid 99); 10 Sep 2016 14:41:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2016 14:41:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8E893E056F; Sat, 10 Sep 2016 14:41:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: reta@apache.org To: commits@cxf.apache.org Date: Sat, 10 Sep 2016 14:41:16 -0000 Message-Id: <94a736b2ae124c20b41df20eab424c2a@git.apache.org> In-Reply-To: <2379b058510a4065acefc49ed97152b0@git.apache.org> References: <2379b058510a4065acefc49ed97152b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/37] cxf git commit: Starting introducing an async response support to the Spark demo archived-at: Sat, 10 Sep 2016 14:41:14 -0000 Starting introducing an async response support to the Spark demo Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/da321593 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/da321593 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/da321593 Branch: refs/heads/master-jaxrs-2.1 Commit: da321593b597711af7307a0fc2c11de070653356 Parents: df243e3 Author: Sergey Beryozkin Authored: Tue Sep 6 13:11:41 2016 +0100 Committer: Sergey Beryozkin Committed: Tue Sep 6 13:11:41 2016 +0100 ---------------------------------------------------------------------- .../release/samples/jax_rs/spark/README.txt | 2 +- .../demo/jaxrs/server/SparkStreamingOutput.java | 14 +++++------ .../demo/jaxrs/server/StreamingService.java | 25 ++++++++++++++------ 3 files changed, 26 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/README.txt ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt index 7f94387..e6a218d 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/README.txt +++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt @@ -9,7 +9,7 @@ mvn exec:java Next do: -curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" https://localhost:9000/stream +curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/stream Limitations: http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index b0d5a50..2220dd4 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -99,7 +99,7 @@ public class SparkStreamingOutput implements StreamingOutput { throw new WebApplicationException(); } } - // Right now we assume by the time we call it the batch the whole InputStream has been + // Right now we assume by the time we call it the whole InputStream has been // processed releaseStreamingContext(); } @@ -116,27 +116,27 @@ public class SparkStreamingOutput implements StreamingOutput { } @Override - public void onBatchStarted(StreamingListenerBatchStarted arg0) { + public void onBatchStarted(StreamingListenerBatchStarted event) { } @Override - public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) { + public void onBatchSubmitted(StreamingListenerBatchSubmitted event) { } @Override - public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted arg0) { + public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) { } @Override - public void onOutputOperationStarted(StreamingListenerOutputOperationStarted arg0) { + public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) { } @Override - public void onReceiverError(StreamingListenerReceiverError arg0) { + public void onReceiverError(StreamingListenerReceiverError event) { } @Override - public void onReceiverStarted(StreamingListenerReceiverStarted arg0) { + public void onReceiverStarted(StreamingListenerReceiverStarted event) { } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 801fa55..b65a68b 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -21,14 +21,18 @@ package demo.jaxrs.server; import java.io.InputStream; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; @@ -46,6 +50,8 @@ import scala.Tuple2; @Path("/") public class StreamingService { + private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, + new ArrayBlockingQueue(10)); private SparkConf sparkConf; public StreamingService(SparkConf sparkConf) { this.sparkConf = sparkConf; @@ -55,19 +61,24 @@ public class StreamingService { @Path("/stream") @Consumes("text/plain") @Produces("text/plain") - public StreamingOutput getStream(InputStream is) { + public void getStream(@Suspended AsyncResponse async, InputStream is) { try { JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream receiverStream = jssc.receiverStream(new InputStreamReceiver(is)); - return new SparkStreamingOutput(jssc, - createOutputDStream(receiverStream)); + + executor.execute(new Runnable() { + public void run() { + async.resume(new SparkStreamingOutput(jssc, + createOutputDStream(receiverStream))); + } + }); } catch (Exception ex) { // the compiler does not allow to catch SparkException directly if (ex instanceof SparkException) { - throw new WebApplicationException(Response.status(503).header("Retry-After", "60").build()); + async.cancel(60); } else { - throw new WebApplicationException(ex); + async.resume(new WebApplicationException(ex)); } } }