cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject cxf git commit: Updating Spark demo to support one way HTTP requests and process concurrent JAX-RS requests OOB when Socket Receiver is used
Date Mon, 26 Sep 2016 11:31:04 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 21c788a31 -> 0f74af5ed


Updating Spark demo to support one way HTTP requests and process concurrent JAX-RS requests
OOB when Socket Receiver is used


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0f74af5e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0f74af5e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0f74af5e

Branch: refs/heads/master
Commit: 0f74af5ed7561838e1950c758a7a05e588badd10
Parents: 21c788a
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Mon Sep 26 12:30:47 2016 +0100
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Mon Sep 26 12:30:47 2016 +0100

----------------------------------------------------------------------
 .../release/samples/jax_rs/spark/README.txt     | 17 ++++--
 .../main/java/demo/jaxrs/server/SparkUtils.java | 21 +++++++-
 .../jaxrs/server/simple/StreamingService.java   | 54 +++++++++++++++++++-
 .../java/demo/jaxrs/server/socket/Server.java   | 20 +++++---
 .../java/demo/jaxrs/server/socket/SparkJob.java | 20 +++++---
 .../jaxrs/server/socket/SparkResultJob.java     | 53 +++++++++++++++++++
 .../server/socket/SparkStreamingOutput.java     | 35 +++++++++----
 .../jaxrs/server/socket/StreamingService.java   | 21 ++++++--
 8 files changed, 207 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt
index cf61100..c143a85 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt
@@ -32,14 +32,23 @@ Next do:
 
 curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/spark/stream
 
-2. PDF/ODT/ODP processing:
+2. Simple one way text processing:
+
+curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/spark/streamOneWay
+
+3. PDF/ODT/ODP processing:
 
 Open multipart.html located in src/main/resources, locate any PDF or OpenOffice text or presentation
file available 
 on the local disk and upload.
 
-Note Spark restricts that only a single streaming context can be active in JVM at a given
moment of time. 
-This is the error which will be logged if you try to access the demo server concurrently:
-"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
+Note Spark restricts that only a single streaming context can be active in JVM at a given
moment of time.
+demo.jaxrs.server.simple.Server creates a new context per every request so this is the error
which will be logged 
+if you try to access this demo server concurrently:
 
+"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
 To ignore this error, set spark.driver.allowMultipleContexts = true".
 
+However demo.jaxrs.server.socket.Server creates only a single context and its JAX-RS frontend
can process multiple requests concurrently
+without having to set "spark.driver.allowMultipleContexts = true".
+
+ 

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
index a01db0f..ff29627 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
@@ -45,9 +45,9 @@ public final class SparkUtils {
     }
     
     public static JavaPairDStream<String, Integer> createOutputDStream(
-        JavaDStream<String> receiverStream) {
+        JavaDStream<String> receiverStream, boolean withId) {
         final JavaDStream<String> words = 
-            receiverStream.flatMap(x -> splitInputString(x));
+            receiverStream.flatMap(x -> (withId ? splitInputStringWithId(x) : splitInputString(x)));
             
         final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> {
                     return new Tuple2<String, Integer>(s, 1);
@@ -69,6 +69,23 @@ public final class SparkUtils {
         }
         return list.iterator();
     }
+    public static Iterator<String> splitInputStringWithId(String x) {
+        int index = x.indexOf(":");
+        String jobId = x.substring(0, index);
+        x = x.substring(index + 1);
+        
+        List<String> list = new LinkedList<String>();
+        for (String s : Arrays.asList(x.split(" "))) {
+            s = s.trim();
+            if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || s.endsWith("."))
{
+                s = s.substring(0, s.length() - 1);
+            }
+            if (!s.isEmpty()) {
+                list.add(jobId + ":" + s);
+            }
+        }
+        return list.iterator();
+    }
     public static String getRandomId() {
         byte[] bytes = new byte[10];
         new Random().nextBytes(bytes);

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
index a561bc0..93dbfb8 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
@@ -38,6 +38,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
+import org.apache.cxf.jaxrs.ext.Oneway;
 import org.apache.cxf.jaxrs.ext.multipart.Attachment;
 import org.apache.cxf.jaxrs.ext.multipart.Multipart;
 import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
@@ -103,6 +104,13 @@ public class StreamingService {
     public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
         processStream(async, SparkUtils.getStringsFromInputStream(is));
     }
+    @POST
+    @Path("/streamOneWay")
+    @Consumes("text/plain")
+    @Oneway
+    public void processSimpleStreamOneWay(InputStream is) {
+        processStreamOneWay(SparkUtils.getStringsFromInputStream(is));
+    }
 
     private void processStream(AsyncResponse async, List<String> inputStrings) {
         try {
@@ -125,7 +133,7 @@ public class StreamingService {
                receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings));
             }
             
-            JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream);
+            JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream,
false);
             wordCounts.foreachRDD(new OutputFunction(streamOut));
             jssc.start();
                                                     
@@ -140,6 +148,31 @@ public class StreamingService {
         }
     }
     
+    private void processStreamOneWay(List<String> inputStrings) {
+        try {
+            SparkConf sparkConf = new SparkConf().setMaster("local[*]")
+                .setAppName("JAX-RS Spark Connect OneWay " + SparkUtils.getRandomId());
+            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

+            
+            JavaDStream<String> receiverStream = null;
+            if ("queue".equals(receiverType)) {
+               Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
+               for (int i = 0; i < 30; i++) {
+                   rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
+               }
+               receiverStream = jssc.queueStream(rddQueue);
+            } else {
+               receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings));
+            }
+            
+            JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream,
false);
+            wordCounts.foreachRDD(new PrintOutputFunction(jssc));
+            jssc.start();
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+    
     
     private static class OutputFunction implements VoidFunction<JavaPairRDD<String,
Integer>> {
         private static final long serialVersionUID = 1L;
@@ -156,5 +189,24 @@ public class StreamingService {
         }
         
     }
+    private static class PrintOutputFunction implements VoidFunction<JavaPairRDD<String,
Integer>> {
+        private static final long serialVersionUID = 1L;
+        private JavaStreamingContext jssc;
+        PrintOutputFunction(JavaStreamingContext jssc) {
+            this.jssc = jssc;
+        }
+        @Override
+        public void call(JavaPairRDD<String, Integer> rdd) {
+            if (!rdd.collectAsMap().isEmpty()) {
+                for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet())
{
+                    String value = entry.getKey() + " : " + entry.getValue();
+                    System.out.println(value);
+                }
+                jssc.stop(false);
+                jssc.close();
+            }
+        }
+        
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
index dc65c9d..fc7968c 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
@@ -62,7 +62,7 @@ public class Server {
         JavaDStream<String> receiverStream = jssc.socketTextStream(
             "localhost", 9999, StorageLevels.MEMORY_ONLY);
         
-        JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream);
+        JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream,
true);
         PrintStream sparkResponseOutputStream = new PrintStream(jaxrsResponseClientSocket.getOutputStream(),
true);
         wordCounts.foreachRDD(new SocketOutputFunction(sparkResponseOutputStream));
         
@@ -106,12 +106,20 @@ public class Server {
         }
         @Override
         public void call(JavaPairRDD<String, Integer> rdd) {
-            for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet())
{
-                String value = entry.getKey() + " : " + entry.getValue();
-                streamOut.println(value);
-            }
             if (!rdd.collectAsMap().isEmpty()) {
-                streamOut.println("<batchEnd>");
+                String jobId = null;
+                PrintStream printStream = null;
+                for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet())
{
+                    String value = entry.getKey() + " : " + entry.getValue();
+                    if (jobId == null) {
+                        int index = value.indexOf(":");
+                        jobId = value.substring(0, index);
+                        printStream = "oneway".equals(jobId) ? System.out : streamOut;
+                        
+                    }
+                    printStream.println(value);
+                }
+                printStream.println(jobId + ":" + "<batchEnd>");
             }
         }
         

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
index a24668a..9128a4e 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
@@ -18,30 +18,38 @@
  */
 package demo.jaxrs.server.socket;
 
-import java.io.BufferedReader;
 import java.io.PrintStream;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.ws.rs.container.AsyncResponse;
 
+import demo.jaxrs.server.SparkUtils;
+
 public class SparkJob implements Runnable {
     private AsyncResponse ac;
-    private BufferedReader sparkInputStream;
+    private Map<String, BlockingQueue<String>> sparkResponses;
     private PrintStream sparkOutputStream;
     private List<String> inputStrings;
-    public SparkJob(AsyncResponse ac, BufferedReader sparkInputStream,
+    public SparkJob(AsyncResponse ac, Map<String, BlockingQueue<String>> sparkResponses,
                           PrintStream sparkOutputStream, List<String> inputStrings)
{
         this.ac = ac;
         this.inputStrings = inputStrings;
-        this.sparkInputStream = sparkInputStream;
+        this.sparkResponses = sparkResponses;
         this.sparkOutputStream = sparkOutputStream;
     }
     @Override
     public void run() {
+        String jobId = SparkUtils.getRandomId();
+        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+        sparkResponses.put(jobId, queue);
+        
         for (String s : inputStrings) {
-            sparkOutputStream.println(s);
+            sparkOutputStream.println(jobId + ":" + s);
         }
-        ac.resume(new SparkStreamingOutput(sparkInputStream));
+        ac.resume(new SparkStreamingOutput(sparkResponses, jobId, queue));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java
new file mode 100644
index 0000000..eb0286b
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server.socket;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class SparkResultJob implements Runnable {
+
+    private Map<String, BlockingQueue<String>> sparkResponses;
+    private BufferedReader sparkInputStream;
+    public SparkResultJob(Map<String, BlockingQueue<String>> sparkResponses,
+                          BufferedReader sparkInputStream) {
+        this.sparkResponses = sparkResponses;
+        this.sparkInputStream = sparkInputStream;
+    }
+
+    
+    @Override
+    public void run() {
+        try {
+            String s = null;
+            while ((s = sparkInputStream.readLine()) != null) {
+                int index = s.indexOf(":");
+                String jobId = s.substring(0, index);
+                String value = s.substring(index + 1);
+                sparkResponses.get(jobId).offer(value);
+            }
+        } catch (IOException ex) {
+            // ignore    
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
index cce1275..681395a 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
@@ -18,29 +18,44 @@
  */
 package demo.jaxrs.server.socket;
 
-import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
 
 public class SparkStreamingOutput implements StreamingOutput {
-    private BufferedReader sparkInputStream;
-    public SparkStreamingOutput(BufferedReader sparkInputStream) {
-        this.sparkInputStream = sparkInputStream;
+    private Map<String, BlockingQueue<String>> sparkResponses;
+    private String jobId;
+    private BlockingQueue<String> queue;
+    public SparkStreamingOutput(Map<String, BlockingQueue<String>> sparkResponses,
String jobId,
+                                BlockingQueue<String> queue) {
+        this.sparkResponses = sparkResponses;
+        this.jobId = jobId;
+        this.queue = queue;
     }
 
     @Override
     public void write(final OutputStream output) throws IOException, WebApplicationException
{
-        PrintStream printStream = new PrintStream(output, true);
-        String s = null;
-        while ((s = sparkInputStream.readLine()) != null) {
-            if ("<batchEnd>".equals(s)) {
-                break;
+        PrintStream out = new PrintStream(output, true);
+        try {
+            while (true) {
+                String responseEntry = queue.poll(1, TimeUnit.MILLISECONDS);
+                if (responseEntry != null) {
+                    if ("<batchEnd>".equals(responseEntry)) {
+                        sparkResponses.remove(jobId);
+                        break;
+                    } else {
+                        out.println(responseEntry);
+                    }
+                }
             }
-            printStream.println(s);
+        } catch (InterruptedException ex) {
+            // ignore
         }
         
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
index 7fa69df..e0e185b 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +39,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
+import org.apache.cxf.jaxrs.ext.Oneway;
 import org.apache.cxf.jaxrs.ext.multipart.Attachment;
 import org.apache.cxf.jaxrs.ext.multipart.Multipart;
 import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
@@ -56,13 +59,13 @@ public class StreamingService {
     }
     private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                                        new ArrayBlockingQueue<Runnable>(10));
-    
+    private Map<String, BlockingQueue<String>> sparkResponses = 
+        new ConcurrentHashMap<String, BlockingQueue<String>>();
     private PrintStream sparkOutputStream;
-    private BufferedReader sparkInputStream;
     
     public StreamingService(BufferedReader sparkInputStream, PrintStream sparkOutputStream)
{
-        this.sparkInputStream = sparkInputStream;
         this.sparkOutputStream = sparkOutputStream;
+        executor.execute(new SparkResultJob(sparkResponses, sparkInputStream));
     }
     
     @POST
@@ -98,8 +101,16 @@ public class StreamingService {
 
     private void processStream(AsyncResponse async, List<String> inputStrings) {
         executor.execute(
-            new SparkJob(async, sparkInputStream, sparkOutputStream, inputStrings));
+            new SparkJob(async, sparkResponses, sparkOutputStream, inputStrings));
     }
     
-    
+    @POST
+    @Path("/streamOneWay")
+    @Consumes("text/plain")
+    @Oneway
+    public void processSimpleStreamOneWay(InputStream is) {
+        for (String s : SparkUtils.getStringsFromInputStream(is)) {
+            sparkOutputStream.println("oneway:" + s);
+        }
+    }
 }


Mime
View raw message