Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B723200B90 for ; Sat, 10 Sep 2016 16:41:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0A451160AD8; Sat, 10 Sep 2016 14:41:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0759160AD7 for ; Sat, 10 Sep 2016 16:41:12 +0200 (CEST) Received: (qmail 97911 invoked by uid 500); 10 Sep 2016 14:41:11 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 97136 invoked by uid 99); 10 Sep 2016 14:41:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2016 14:41:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D5C6EF9A2; Sat, 10 Sep 2016 14:41:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: reta@apache.org To: commits@cxf.apache.org Date: Sat, 10 Sep 2016 14:41:27 -0000 Message-Id: In-Reply-To: <2379b058510a4065acefc49ed97152b0@git.apache.org> References: <2379b058510a4065acefc49ed97152b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/37] cxf git commit: [CXF-6618] Initial code for processing binary data with Tika and pushing them to Spark archived-at: Sat, 10 Sep 2016 14:41:14 -0000 [CXF-6618] Initial code for processing binary data with Tika and pushing them to Spark Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/4427f779 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/4427f779 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/4427f779 Branch: refs/heads/master-jaxrs-2.1 Commit: 4427f7790e522144baa325e5b5c051885e9dc706 Parents: d68d8d8 Author: Sergey Beryozkin Authored: Thu Sep 8 13:36:36 2016 +0100 Committer: Sergey Beryozkin Committed: Thu Sep 8 13:36:36 2016 +0100 ---------------------------------------------------------------------- .../release/samples/jax_rs/spark/README.txt | 18 +++-- .../main/release/samples/jax_rs/spark/pom.xml | 15 ++++ .../jaxrs/server/AdvancedStreamingService.java | 80 -------------------- .../src/main/java/demo/jaxrs/server/Server.java | 2 +- .../demo/jaxrs/server/SparkStreamingOutput.java | 4 +- .../demo/jaxrs/server/StreamingService.java | 37 ++++++++- .../java/demo/jaxrs/server/TikaReceiver.java | 43 +++++++++++ .../spark/src/main/resources/multipartForm.html | 49 ++++++++++++ 8 files changed, 154 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/README.txt ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt index e6a218d..8a7b292 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/README.txt +++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt @@ -1,5 +1,5 @@ -JAX-RS Basic Spark Demo -======================= +JAX-RS Spark Streaming Demo +=========================== This demo demonstrates how to connect HTTP and Spark streams with JAX-RS @@ -9,15 +9,17 @@ mvn exec:java Next do: +1. Simple text processing: + curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/stream -Limitations: +2. PDF processing: -This demo accepts one request at a time due to Spark restricting that only a single streaming context can be active -in JVM at a given moment of time. This is the error which will be logged if you try to access the demo server concurrently: +Open multipart.html located in src/main/resources, locate any PDF file available on the local disk and upload. +Note Spark restricts that only a single streaming context can be active in JVM at a given moment of time. +This is the error which will be logged if you try to access the demo server concurrently: "org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). - To ignore this error, set spark.driver.allowMultipleContexts = true". - - More flexible demo server will be added in due time. + +To ignore this error, set spark.driver.allowMultipleContexts = true". http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml b/distribution/src/main/release/samples/jax_rs/spark/pom.xml index b2541a0..10a00da 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/pom.xml +++ b/distribution/src/main/release/samples/jax_rs/spark/pom.xml @@ -44,6 +44,11 @@ 3.2.0-SNAPSHOT + org.apache.cxf + cxf-rt-rs-extension-search + 3.2.0-SNAPSHOT + + org.apache.spark spark-streaming_2.10 2.0.0-preview @@ -59,6 +64,16 @@ netty 3.7.0.Final + + org.apache.tika + tika-core + 2.0-SNAPSHOT + + + org.apache.tika + tika-parser-pdf-module + 2.0-SNAPSHOT + http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java deleted file mode 100644 index 1971fd9..0000000 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package demo.jaxrs.server; - -import java.io.InputStream; - -import javax.ws.rs.Consumes; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.StreamingOutput; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.StreamingContext; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.ReceiverInputDStream; -import org.apache.spark.streaming.receiver.Receiver; - -import scala.reflect.ClassTag; - -// INCOMPLETE - -@Path("/") -public class AdvancedStreamingService { - private JavaStreamingContext jssc; - public AdvancedStreamingService(SparkConf sparkConf) { - this.jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); - new MyReceiverInputDStream(jssc.ssc(), - scala.reflect.ClassTag$.MODULE$.apply(String.class)); - } - - @POST - @Path("/stream") - @Consumes("text/plain") - @Produces("text/plain") - public StreamingOutput getStream(InputStream is) { - - return null; - } - - - public static class MyReceiverInputDStream extends ReceiverInputDStream { - - public MyReceiverInputDStream(StreamingContext ssc, ClassTag evidence) { - super(ssc, evidence); - } - public void putInputStream(InputStream is) { - - } - @Override - public Receiver getReceiver() { - // A receiver can be created per every String the input stream - return new InputStreamReceiver(getInputStream()); - } - public InputStream getInputStream() { - // TODO Auto-generated method stub - return null; - } - } - - - -} http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java index 50f915a..8a1092f 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java @@ -30,7 +30,7 @@ public class Server { sf.setResourceClasses(StreamingService.class); sf.setResourceProvider(StreamingService.class, new SingletonResourceProvider(new StreamingService())); - sf.setAddress("http://localhost:9000/"); + sf.setAddress("http://localhost:9000/spark"); sf.create(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index 43166fe..7324806 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -35,16 +35,18 @@ public class SparkStreamingOutput implements StreamingOutput { private JavaStreamingContext jssc; private volatile boolean sparkBatchCompleted; + private volatile boolean outputWriteDone; public SparkStreamingOutput(JavaStreamingContext jssc) { this.jssc = jssc; } @Override public void write(final OutputStream output) throws IOException, WebApplicationException { - while (!sparkBatchCompleted || !responseQueue.isEmpty()) { + while (!sparkBatchCompleted || !outputWriteDone || !responseQueue.isEmpty()) { try { String responseEntry = responseQueue.poll(1, TimeUnit.MILLISECONDS); if (responseEntry != null) { + outputWriteDone = true; output.write(StringUtils.toBytesUTF8(responseEntry)); output.flush(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index c9cc033..5e059fc 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.util.Arrays; import java.util.Iterator; import java.util.Map; +import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @@ -35,6 +36,11 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import org.apache.cxf.common.util.Base64Utility; +import org.apache.cxf.jaxrs.ext.multipart.Attachment; +import org.apache.cxf.jaxrs.ext.multipart.Multipart; +import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor; +import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaPairRDD; @@ -47,6 +53,8 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.receiver.Receiver; +import org.apache.tika.parser.pdf.PDFParser; import scala.Tuple2; @@ -59,20 +67,35 @@ public class StreamingService { } @POST + @Path("/multipart") + @Consumes("multipart/form-data") + @Produces("text/plain") + public void processMultipartStream(@Suspended AsyncResponse async, + @Multipart("file") Attachment att) { + TikaContentExtractor tika = new TikaContentExtractor(new PDFParser()); + TikaContent tikaContent = tika.extract(att.getObject(InputStream.class)); + processStream(async, new TikaReceiver(tikaContent)); + } + + @POST @Path("/stream") @Consumes("text/plain") @Produces("text/plain") - public void getStream(@Suspended AsyncResponse async, InputStream is) { + public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) { + processStream(async, new InputStreamReceiver(is)); + } + + private void processStream(AsyncResponse async, Receiver receiver) { try { - SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect"); + SparkConf sparkConf = new SparkConf().setMaster("local[*]") + .setAppName("JAX-RS Spark Connect " + getRandomId()); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc); SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut); jssc.addStreamingListener(sparkListener); - JavaReceiverInputDStream receiverStream = - jssc.receiverStream(new InputStreamReceiver(is)); + JavaReceiverInputDStream receiverStream = jssc.receiverStream(receiver); JavaPairDStream wordCounts = createOutputDStream(receiverStream); wordCounts.foreachRDD(new OutputFunction(streamOut)); jssc.start(); @@ -87,6 +110,12 @@ public class StreamingService { } } } + + private static String getRandomId() { + byte[] bytes = new byte[10]; + new Random().nextBytes(bytes); + return Base64Utility.encode(bytes); + } @SuppressWarnings("serial") private static JavaPairDStream createOutputDStream( http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java new file mode 100644 index 0000000..daab2be --- /dev/null +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package demo.jaxrs.server; + +import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; + +public class TikaReceiver extends Receiver { + + private static final long serialVersionUID = 1L; + private TikaContent tikaContent; + + public TikaReceiver(TikaContent tikaContent) { + super(StorageLevel.MEMORY_ONLY()); + this.tikaContent = tikaContent; + } + @Override + public void onStart() { + super.store(tikaContent.getContent()); + } + @Override + public void onStop() { + // complete + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html new file mode 100644 index 0000000..264b4f6 --- /dev/null +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html @@ -0,0 +1,49 @@ + + + Form Upload + + + +

Form Upload

+
+
+
+ + + + + + + + + + + +
File: + +
 
+   +
+
+ + + + +
+ +
+
+ + + +