cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject git commit: [CXF-5582] add StreamingResponse-based delivery to AtomPushBean
Date Mon, 01 Sep 2014 17:06:56 GMT
Repository: cxf
Updated Branches:
  refs/heads/master ec4435d70 -> 250dc87fc


[CXF-5582] add StreamingResponse-based delivery to AtomPushBean


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/250dc87f
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/250dc87f
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/250dc87f

Branch: refs/heads/master
Commit: 250dc87fc7e0b0f6caffb5d7500feac9db048bbb
Parents: ec4435d
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Mon Sep 1 19:05:54 2014 +0200
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Mon Sep 1 19:07:01 2014 +0200

----------------------------------------------------------------------
 .../web/logging/atom/AtomPullServer.java        |  26 +-
 .../atom/AtomPushOverWebSocketServer.java       | 242 +++++++++++++++++++
 2 files changed, 259 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/250dc87f/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java
----------------------------------------------------------------------
diff --git a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java
b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java
index d133971..07a64ec 100644
--- a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java
+++ b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -35,6 +34,7 @@ import java.util.WeakHashMap;
 import java.util.logging.Handler;
 
 import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -73,8 +73,7 @@ public class AtomPullServer extends AbstractAtomBean {
     private volatile int recordsSize;
     private volatile boolean alreadyClosed;
     private SearchCondition<LogRecord> readableStorageCondition;
-    //TODO register streams using specific keys so that they can be individually unregistered
-    private List<Object> activeStreams;
+    private Map<String, Object> activeStreams;
     
     @Context
     private MessageContext context;
@@ -123,7 +122,7 @@ public class AtomPullServer extends AbstractAtomBean {
             }
             readableStorageCondition = list.size() == 0 ? null : new OrSearchCondition<LogRecord>(list);
         }
-        activeStreams = Collections.synchronizedList(new ArrayList<Object>());
+        activeStreams = Collections.synchronizedMap(new HashMap<String, Object>());
         initBusProperty();
     }
     
@@ -206,13 +205,14 @@ public class AtomPullServer extends AbstractAtomBean {
     @GET
     @Produces({"text/html", "application/xhtml+xml" })
     @Path("subscribe/alternate")
-    public StreamingOutput getAlternateContinuousFeed() {
+    public StreamingOutput getAlternateContinuousFeed(@HeaderParam("requestId") String reqid)
{
+        final String key = reqid == null ? "*" : reqid;
         return new StreamingOutput() {
             public void write(final OutputStream out) throws IOException, WebApplicationException
{
                 // return the last entry
                 out.write(convertEntryToHtmlFragment(((LinkedList<LogRecord>)records).getLast()).getBytes());
                 
-                activeStreams.add(out);
+                activeStreams.put(key, out);
             }
         };
     }
@@ -221,7 +221,8 @@ public class AtomPullServer extends AbstractAtomBean {
     @GET
     @Produces("application/atom+xml;type=entry")
     @Path("subscribe")
-    public StreamingResponse<Entry> getXmlContinuousFeed() {
+    public StreamingResponse<Entry> getXmlContinuousFeed(@HeaderParam("requestId")
String reqid) {
+        final String key = reqid == null ? "*" : reqid;
         return new StreamingResponse<Entry>() {
             public void writeTo(final StreamingResponse.Writer<Entry> out) throws IOException
{
                 // return the last entry
@@ -231,12 +232,19 @@ public class AtomPullServer extends AbstractAtomBean {
                     .convert(Collections.singletonList(((LinkedList<LogRecord>)records).getLast())).get(0);
                 out.write(entry);
 
-                activeStreams.add(out);
+                activeStreams.put(key, out);
             }
         };
     }
 
     @GET
+    @Produces("text/plain")
+    @Path("unsubscribe/{key}")
+    public Boolean unsubscribeContinuousFeed(@PathParam("key") String key) {
+        return activeStreams.remove(key) != null;
+    }
+
+    @GET
     @Path("entry/{id}")
     @Produces("application/atom+xml;type=entry")
     public Entry getEntry(@PathParam("id") int index) {
@@ -394,7 +402,7 @@ public class AtomPullServer extends AbstractAtomBean {
         if (activeStreams.size() > 0) {
             byte[] rbytes = null;
             Entry rentry = null;
-            for (Iterator<Object> it = activeStreams.iterator(); it.hasNext();) {
+            for (Iterator<Object> it = activeStreams.values().iterator(); it.hasNext();)
{
                 Object out = it.next();
                 try {
                     if (out instanceof OutputStream) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/250dc87f/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java
----------------------------------------------------------------------
diff --git a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java
b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java
new file mode 100644
index 0000000..7a51445
--- /dev/null
+++ b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.management.web.logging.atom;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Handler;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+
+import org.apache.abdera.model.Element;
+import org.apache.abdera.model.Feed;
+import org.apache.commons.lang.Validate;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.apache.cxf.management.web.logging.atom.converter.Converter;
+import org.apache.cxf.management.web.logging.atom.deliverer.Deliverer;
+
+/**
+ * Bean used to configure {@link AtomPushHandler JUL handler} with Spring instead of properties
file. See
+ * {@link AtomPushHandler} class for detailed description of parameters. Next to configuration
of handler,
+ * Spring bean offers simple configuration of associated loggers that share ATOM push-style
handler.
+ * <p>
+ * General rules:
+ * <ul>
+ * <li>When {@link #setConverter(Converter) converter} property is not set explicitly,
default converter is
+ * created.</li>
+ * <li>When {@link #setLoggers(String) loggers} property is used, it overrides pair
of
+ * {@link #setLogger(String) logger} and {@link #setLevel(String) level} properties; and
vice versa.</li>
+ * <li>When logger is not set, handler is attached to root logger (named ""); when
level is not set for
+ * logger, default "INFO" level is used.</li>
+ * <li>When {@link #setBatchSize(String) batchSize} property is not set or set to wrong
value, default batch
+ * size of "1" is used.</li>
+ * <li>When deliverer property is NOT set, use of "retryXxx" properties causes creation
of retrying default
+ * deliverer.</li>
+ * </ul>
+ * Examples:
+ * <p>
+ * ATOM push handler with registered with root logger for all levels or log events, pushing
one feed per event
+ * over the connected websocket, using default conversion methods:
+ * 
+ * <pre>
+ *   &lt;bean class=&quot;org.apache.cxf.jaxrs.ext.logging.atom.AtomPushOverWebSocketBean&quot;

+ *     init-method=&quot;init&quot;&gt;
+ *       &lt;property name=&quot;level&quot; value=&quot;ALL&quot; /&gt;
+ *   &lt;/bean&gt;
+ * </pre>
+ * 
+ * ATOM push handler registered with multiple loggers and listening for different levels
(see
+ * {@link #setLoggers(String) loggers} property description for syntax details). Custom deliverer
will take
+ * care of feeds, each of which carries batch of 10 log events:
+ * 
+ * <pre>
+ *   ...
+ *   &lt;bean class=&quot;org.apache.cxf.jaxrs.ext.logging.atom.AtomPushOverWebSocketServer&quot;

+ *     init-method=&quot;init&quot;&gt;
+ *       &lt;property name=&quot;loggers&quot; value=&quot;
+ *           org.apache.cxf:DEBUG,
+ *           org.apache.cxf.jaxrs,
+ *           org.apache.cxf.bus:ERROR&quot; /&gt;
+ *       &lt;property name=&quot;batchSize&quot; value=&quot;10&quot;
/&gt;
+ *   &lt;/bean&gt;
+ * </pre>
+ */
+//REVISIT we will move the common part into AbstractAtomPushBean so that it can be shared
by both AtomPushBean and this
+@Path("/logs2")
+public final class AtomPushOverWebSocketServer extends AbstractAtomBean {
+    private AtomPushEngineConfigurator conf = new AtomPushEngineConfigurator();
+    private Map<String, Object> activeStreams;
+
+    /**
+     * Creates unconfigured and uninitialized bean. To configure setters must be used, then
{@link #init()}
+     * must be called.
+     */
+    public AtomPushOverWebSocketServer() {
+        conf.setDeliverer(new WebSocketDeliverer());
+    }
+
+    @Override
+    public void init() {
+        super.init();
+        activeStreams = Collections.synchronizedMap(new HashMap<String, Object>());
+    }
+
+    /**
+     * Set initialized converter.
+     */
+    public void setConverter(Converter converter) {
+        checkInit();
+        Validate.notNull(converter, "converter is null");
+        conf.setConverter(converter);
+    }
+    
+    /**
+     * Size of batch; empty string for default one element batch.
+     */
+    public void setBatchSize(String batchSize) {
+        checkInit();
+        Validate.notNull(batchSize, "batchSize is null");
+        conf.setBatchSize(batchSize);
+    }
+    
+    /**
+     * Batch cleanup time in minutes
+     */
+    public void setBatchCleanupTime(String batchCleanupTime) {
+        checkInit();
+        Validate.notNull(batchCleanupTime, "batchCleanup is null");
+        conf.setBatchCleanupTime(batchCleanupTime);
+    }
+
+    /**
+     * Retry pause calculation strategy, either "linear" or "exponential".
+     */
+    public void setRetryPause(String retryPause) {
+        checkInit();
+        Validate.notNull(retryPause, "retryPause is null");
+        conf.setRetryPause(retryPause);
+    }
+
+    /**
+     * Retry pause time (in seconds).
+     */
+    public void setRetryPauseTime(String time) {
+        checkInit();
+        Validate.notNull(time, "time is null");
+        conf.setRetryPauseTime(time);
+    }
+
+    /**
+     * Retry timeout (in seconds).
+     */
+    public void setRetryTimeout(String timeout) {
+        checkInit();
+        Validate.notNull(timeout, "timeout is null");
+        conf.setRetryTimeout(timeout);
+    }
+
+    /**
+     * Conversion output type: "feed" or "entry".
+     */
+    public void setOutput(String output) {
+        checkInit();
+        Validate.notNull(output, "output is null");
+        conf.setOutput(output);
+    }
+
+    /**
+     * Multiplicity of subelement of output: "one" or "many".
+     */
+    public void setMultiplicity(String multiplicity) {
+        checkInit();
+        Validate.notNull(multiplicity, "multiplicity is null");
+        conf.setMultiplicity(multiplicity);
+    }
+
+    /**
+     * Entry data format: "content" or "extension".
+     */
+    public void setFormat(String format) {
+        checkInit();
+        Validate.notNull(format, "format is null");
+        conf.setFormat(format);
+    }
+
+    protected Handler createHandler() {
+        return new AtomPushHandler(conf.createEngine());
+    }
+
+    @GET
+    @Produces("application/atom+xml")
+    @Path("subscribe")
+    public StreamingResponse<Feed> subscribeXmlFeed(@HeaderParam("requestId") String
reqid) {
+        final String key = reqid == null ? "*" : reqid;
+        return new StreamingResponse<Feed>() {
+            public void writeTo(final StreamingResponse.Writer<Feed> out) throws IOException
{
+                activeStreams.put(key,  out);
+            }
+        };
+    }
+
+    @GET
+    @Produces("text/plain")
+    @Path("unsubscribe/{key}")
+    public Boolean unsubscribeXmlFeed(@PathParam("key") String key) {
+        return activeStreams.remove(key) != null;
+    }
+
+    private class WebSocketDeliverer implements Deliverer {
+
+        @Override
+        public boolean deliver(Element element) throws InterruptedException {
+            if (activeStreams.size() > 0) {
+                for (Iterator<Object> it = activeStreams.values().iterator(); it.hasNext();)
{
+                    Object out = it.next();
+                    try {
+                        if (out instanceof StreamingResponse.Writer) {
+                            ((StreamingResponse.Writer)out).write(element);
+                        }
+                    } catch (Throwable t) {
+                        // REVISIT
+                        // the reason for not logging anything here is to not further clog
the logger 
+                        // with this log broadcasting failure.
+                        System.err.print("ERROR | AtomPushOverWebSocketServer | " + t + ";
Unregistering " + out);
+                        it.remove();
+                    }
+                }
+            }
+
+            return true;
+        }
+
+        @Override
+        public String getEndpointAddress() {
+            //REVISIT return something else?
+            return null;
+        }
+        
+    }
+}


Mime
View raw message