cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject [1/2] cxf git commit: [CXF-6618] Prototyping a basic Spark streaming demo
Date Wed, 10 Feb 2016 22:37:39 GMT
Repository: cxf
Updated Branches:
  refs/heads/master c857aa32e -> c6fb2db07


[CXF-6618] Prototyping a basic Spark streaming demo


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e6f07226
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e6f07226
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e6f07226

Branch: refs/heads/master
Commit: e6f0722615190861192972a51b3986b34efd1f0e
Parents: 5f038c2
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Wed Feb 10 22:36:55 2016 +0000
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Wed Feb 10 22:36:55 2016 +0000

----------------------------------------------------------------------
 .../main/release/samples/jax_rs/spark/pom.xml   |  90 ++++++++++++
 .../demo/jaxrs/server/InputStreamReceiver.java  |  45 ++++++
 .../src/main/java/demo/jaxrs/server/Server.java |  49 +++++++
 .../demo/jaxrs/server/SparkStreamingOutput.java | 139 +++++++++++++++++++
 .../demo/jaxrs/server/StreamingService.java     | 116 ++++++++++++++++
 5 files changed, 439 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
new file mode 100644
index 0000000..2971162
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>jax_rs_spark</artifactId>
+    <name>JAX-RS Spark Streaming Demo</name>
+    <description>JAX-RS Spark Streaming Demo</description>
+    <parent>
+        <groupId>org.apache.cxf.samples</groupId>
+        <artifactId>cxf-samples</artifactId>
+        <version>3.2.0-SNAPSHOT</version>
+        <relativePath>../..</relativePath>
+    </parent>
+    <properties>
+        <cxf.version>${project.version}</cxf.version>
+        <httpclient.version>3.1</httpclient.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-jetty</artifactId>
+            <version>3.2.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>3.2.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+            <version>1.6.0</version>
+            <exclusions>
+            <exclusion>
+              <groupId>io.netty</groupId>
+              <artifactId>netty</artifactId>
+            </exclusion>
+            </exclusions>  
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>3.7.0.Final</version>
+        </dependency> 
+    </dependencies>
+
+    <repositories>
+      <repository> 
+       <id>repo</id>
+       <url>http://mvnrepository.com/artifact</url>  
+      </repository>
+    </repositories>
+     
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <executable>java</executable>
+                    <mainClass>demo.jaxrs.server.Server</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
new file mode 100644
index 0000000..de3ddf5
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
@@ -0,0 +1,45 @@
+package demo.jaxrs.server;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class InputStreamReceiver extends Receiver<String> {
+
+    private static final long serialVersionUID = 1L;
+    private List<String> inputStrings = new LinkedList<String>();
+    
+    public InputStreamReceiver(InputStream is) {
+        super(StorageLevel.MEMORY_ONLY());
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+        String userInput = null;
+        while ((userInput = readLine(reader)) != null) {
+            inputStrings.add(userInput);
+        }
+    }
+    @Override
+    public void onStart() {
+        super.store(inputStrings.iterator());
+    }
+
+    private String readLine(BufferedReader reader) {
+        try {
+            return reader.readLine();
+        } catch (IOException ex) {
+            throw new WebApplicationException(500);
+        }
+    }
+    @Override
+    public void onStop() {
+        // complete
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
new file mode 100644
index 0000000..31a9d04
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package demo.jaxrs.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.spark.SparkConf;
+
+
+public class Server {
+
+    protected Server() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark
Connect");
+        
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setResourceClasses(StreamingService.class);
+        sf.setResourceProvider(StreamingService.class, 
+            new SingletonResourceProvider(new StreamingService(sparkConf)));
+        sf.setAddress("http://localhost:9000/");
+
+        sf.create();
+    }
+
+    public static void main(String args[]) throws Exception {
+        new Server();
+        Thread.sleep(60 * 60 * 1000);
+        System.out.println("Server ready...");
+        System.out.println("Server exiting");
+        System.exit(0);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
new file mode 100644
index 0000000..6be0594
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.scheduler.StreamingListener;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
+
+public class SparkStreamingOutput implements StreamingOutput {
+    private JavaPairDStream<String, Integer> wordCounts;
+    private JavaStreamingContext jssc;
+    private boolean sparkDone;
+    private boolean batchCompleted;
+    public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer>
wordCounts) {
+        this.jssc = jssc;
+        this.wordCounts = wordCounts;
+    }
+
+    @Override
+    public void write(final OutputStream output) throws IOException, WebApplicationException
{
+        wordCounts.foreachRDD(new OutputFunction(output));
+        jssc.addStreamingListener(new SparkStreamingListener());
+        jssc.start();
+        awaitTermination();
+        jssc.stop(false);
+        jssc.close();
+    }
+
+    private synchronized void awaitTermination() {
+        while (!sparkDone) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+    }
+    private synchronized void releaseStreamingContext() {
+        if (batchCompleted) {
+            sparkDone = true;
+            notify();
+        }
+    }
+    
+    private synchronized void setBatchCompleted() {
+        batchCompleted = true;
+    }
+    
+    private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>>
{
+        private static final long serialVersionUID = 1L;
+        private OutputStream os;
+        public OutputFunction(OutputStream os) {
+            this.os = os;
+        }
+        @Override
+        public void call(JavaPairRDD<String, Integer> rdd) {
+            for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet())
{
+                String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
+                try {
+                    os.write(value.getBytes());
+                    os.flush();
+                } catch (IOException ex) {
+                    throw new WebApplicationException(); 
+                }
+            }
+            releaseStreamingContext();
+        }
+        
+    }
+    private class SparkStreamingListener implements StreamingListener {
+
+        @Override
+        public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+            setBatchCompleted();
+        }
+
+        @Override
+        public void onBatchStarted(StreamingListenerBatchStarted arg0) {
+        }
+
+        @Override
+        public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) {
+        }
+
+        @Override
+        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted
arg0) {
+        }
+
+        @Override
+        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted arg0)
{
+        }
+
+        @Override
+        public void onReceiverError(StreamingListenerReceiverError arg0) {
+        }
+
+        @Override
+        public void onReceiverStarted(StreamingListenerReceiverStarted arg0) {
+        }
+
+        @Override
+        public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e6f07226/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
new file mode 100644
index 0000000..4e2e8ff
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import scala.Tuple2;
+
+
+@Path("/")
+public class StreamingService {
+    private SparkConf sparkConf;
+    public StreamingService(SparkConf sparkConf) {
+        this.sparkConf = sparkConf;
+    }
+    
+    @POST
+    @Path("/stream")
+    @Consumes("text/plain")
+    @Produces("text/plain")
+    public StreamingOutput getStream(InputStream is) {
+        try {
+            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+            JavaReceiverInputDStream<String> receiverStream = 
+                jssc.receiverStream(new InputStreamReceiver(is));
+            return new SparkStreamingOutput(jssc, 
+                                            createOutputDStream(receiverStream));
+        } catch (Exception ex) {
+            if (ex instanceof SparkException) {
+                // org.apache.spark.SparkException: Only one SparkContext may be running
in this JVM (see SPARK-2243).
+                // To ignore this error, set spark.driver.allowMultipleContexts = true
+                throw new WebApplicationException(Response.status(503).header("Retry-After",
"60").build());
+            } else {
+                throw new WebApplicationException(ex);
+            }
+        }
+    }
+
+    @SuppressWarnings("serial")
+    private static JavaPairDStream<String, Integer> createOutputDStream(JavaReceiverInputDStream<String>
receiverStream) {
+        final JavaDStream<String> words = receiverStream.flatMap(
+            new FlatMapFunction<String, String>() {
+                @Override 
+                public Iterable<String> call(String x) {
+                    return Arrays.asList(x.split(" "));
+                }
+            });
+        final JavaPairDStream<String, Integer> pairs = words.mapToPair(
+            new PairFunction<String, String, Integer>() {
+            
+                @Override 
+                public Tuple2<String, Integer> call(String s) {
+                    return new Tuple2<String, Integer>(s, 1);
+                }
+            });
+        return pairs.reduceByKey(
+            new Function2<Integer, Integer, Integer>() {
+             
+                @Override 
+                public Integer call(Integer i1, Integer i2) {
+                    return i1 + i2;
+                }
+            });
+    }
+   
+    //new MyReceiverInputDStream(jssc.ssc(), 
+    //                           scala.reflect.ClassTag$.MODULE$.apply(String.class));
+//    public static class MyReceiverInputDStream extends ReceiverInputDStream<String>
{
+//
+//        public MyReceiverInputDStream(StreamingContext ssc_, ClassTag<String> evidence$1)
{
+//            super(ssc_, evidence$1);
+//        }
+//
+//        @Override
+//        public Receiver<String> getReceiver() {
+//            return new InputStreamReceiver(is);
+//        }
+//        
+//    }
+}


Mime
View raw message