cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject cxf git commit: Adding README and prototyping AdvancedStreamingService for some future work
Date Thu, 11 Feb 2016 17:53:12 GMT
Repository: cxf
Updated Branches:
  refs/heads/master c63f06f37 -> 22755d26e


Adding README and prototyping AdvancedStreamingService for some future work


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/22755d26
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/22755d26
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/22755d26

Branch: refs/heads/master
Commit: 22755d26ec2f6255004f57deec711ee68ce11d6c
Parents: c63f06f
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Thu Feb 11 17:52:57 2016 +0000
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Thu Feb 11 17:52:57 2016 +0000

----------------------------------------------------------------------
 .../release/samples/jax_rs/spark/README.txt     |  23 ++++
 .../jaxrs/server/AdvancedStreamingService.java  | 115 +++++++++++++++++++
 .../demo/jaxrs/server/InputStreamReceiver.java  |   4 +
 .../demo/jaxrs/server/SparkStreamingOutput.java |   8 ++
 .../demo/jaxrs/server/StreamingService.java     |  18 +--
 5 files changed, 152 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/22755d26/distribution/src/main/release/samples/jax_rs/spark/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt
new file mode 100644
index 0000000..7f94387
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt
@@ -0,0 +1,23 @@
+JAX-RS Basic Spark Demo 
+=======================
+
+This demo demonstrates how to connect HTTP and Spark streams with JAX-RS
+
+Build the demo with "mvn install" and start it with
+
+mvn exec:java
+
+Next do: 
+
+curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" https://localhost:9000/stream
+
+Limitations: 
+
+This demo accepts one request at a time due to Spark restricting that only a single streaming
context can be active
+in JVM at a given moment of time. This is the error which will be logged if you try to access
the demo server concurrently:
+
+"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
+ To ignore this error, set spark.driver.allowMultipleContexts = true".
+ 
+ More flexible demo server will be added in due time. 
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/22755d26/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
new file mode 100644
index 0000000..9ccf847
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.ReceiverInputDStream;
+import org.apache.spark.streaming.receiver.Receiver;
+
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+
+// INCOMPLETE
+
+@Path("/")
+public class AdvancedStreamingService {
+    private JavaStreamingContext jssc;
+    private MyReceiverInputDStream receiverInputDStream;
+    public AdvancedStreamingService(SparkConf sparkConf) {
+        this.jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+        this.receiverInputDStream = new MyReceiverInputDStream(jssc.ssc(), 
+                                   scala.reflect.ClassTag$.MODULE$.apply(String.class));
+    }
+    
+    @POST
+    @Path("/stream")
+    @Consumes("text/plain")
+    @Produces("text/plain")
+    public StreamingOutput getStream(InputStream is) {
+        
+        return null;
+    }
+
+    @SuppressWarnings("serial")
+    private static JavaPairDStream<String, Integer> createOutputDStream(JavaReceiverInputDStream<String>
receiverStream) {
+        final JavaDStream<String> words = receiverStream.flatMap(
+            new FlatMapFunction<String, String>() {
+                @Override 
+                public Iterable<String> call(String x) {
+                    return Arrays.asList(x.split(" "));
+                }
+            });
+        final JavaPairDStream<String, Integer> pairs = words.mapToPair(
+            new PairFunction<String, String, Integer>() {
+            
+                @Override 
+                public Tuple2<String, Integer> call(String s) {
+                    return new Tuple2<String, Integer>(s, 1);
+                }
+            });
+        return pairs.reduceByKey(
+            new Function2<Integer, Integer, Integer>() {
+             
+                @Override 
+                public Integer call(Integer i1, Integer i2) {
+                    return i1 + i2;
+                }
+            });
+    }
+   
+    
+    public static class MyReceiverInputDStream extends ReceiverInputDStream<String>
{
+
+        public MyReceiverInputDStream(StreamingContext ssc_, ClassTag<String> evidence$1)
{
+            super(ssc_, evidence$1);
+        }
+        public void putInputStream(InputStream is) {
+            
+        }
+        @Override
+        public Receiver<String> getReceiver() {
+            return new InputStreamReceiver(getInputStream());
+        }
+        public InputStream getInputStream() {
+            // TODO Auto-generated method stub
+            return null;
+        }    
+    }
+
+
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/22755d26/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
index de3ddf5..acbc358 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
@@ -21,6 +21,10 @@ public class InputStreamReceiver extends Receiver<String> {
         super(StorageLevel.MEMORY_ONLY());
         BufferedReader reader = new BufferedReader(new InputStreamReader(is));
         String userInput = null;
+        // Receiver is meant to be serializable, but it would be
+        // great if if we could avoid copying InputStream
+        // TODO: submit Spark enhancement request so that it can keep streaming from 
+        // the incoming InputStream to its processing nodes ?
         while ((userInput = readLine(reader)) != null) {
             inputStrings.add(userInput);
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/22755d26/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 6be0594..d76f549 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -79,6 +79,9 @@ public class SparkStreamingOutput implements StreamingOutput {
         batchCompleted = true;
     }
     
+    
+    // This dedicated class was introduced to validate that when Spark is running it does
not
+    // fail the processing due to OutputStream being one of the fields in the serializable
class,
     private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>>
{
         private static final long serialVersionUID = 1L;
         private OutputStream os;
@@ -96,6 +99,8 @@ public class SparkStreamingOutput implements StreamingOutput {
                     throw new WebApplicationException(); 
                 }
             }
+            // Right now we assume by the time we call it the batch the whole InputStream
has been
+            // processed
             releaseStreamingContext();
         }
         
@@ -104,6 +109,9 @@ public class SparkStreamingOutput implements StreamingOutput {
 
         @Override
         public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+            // as soon as the batch is finished we let the streaming context go
+            // but this may need to be revisited if a given InputStream happens to be processed
in
+            // multiple batches ?
             setBatchCompleted();
         }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/22755d26/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 4e2e8ff..1dcb439 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -62,9 +62,8 @@ public class StreamingService {
             return new SparkStreamingOutput(jssc, 
                                             createOutputDStream(receiverStream));
         } catch (Exception ex) {
+            // the compiler does not allow to catch SparkException directly
             if (ex instanceof SparkException) {
-                // org.apache.spark.SparkException: Only one SparkContext may be running
in this JVM (see SPARK-2243).
-                // To ignore this error, set spark.driver.allowMultipleContexts = true
                 throw new WebApplicationException(Response.status(503).header("Retry-After",
"60").build());
             } else {
                 throw new WebApplicationException(ex);
@@ -99,18 +98,5 @@ public class StreamingService {
             });
     }
    
-    //new MyReceiverInputDStream(jssc.ssc(), 
-    //                           scala.reflect.ClassTag$.MODULE$.apply(String.class));
-//    public static class MyReceiverInputDStream extends ReceiverInputDStream<String>
{
-//
-//        public MyReceiverInputDStream(StreamingContext ssc_, ClassTag<String> evidence$1)
{
-//            super(ssc_, evidence$1);
-//        }
-//
-//        @Override
-//        public Receiver<String> getReceiver() {
-//            return new InputStreamReceiver(is);
-//        }
-//        
-//    }
+    
 }


Mime
View raw message