activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5155
Date Tue, 28 Apr 2015 22:10:44 GMT
Repository: activemq
Updated Branches:
  refs/heads/master b444b6c46 -> eea3230c3


https://issues.apache.org/jira/browse/AMQ-5155

Ensure that the inactivity monitor can see that new data is arriving to
prevent wrongful disconnect of clients.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eea3230c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eea3230c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eea3230c

Branch: refs/heads/master
Commit: eea3230c3783dd22abceee4f53cb928bc4121af1
Parents: b444b6c
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Apr 28 18:10:24 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Apr 28 18:10:24 2015 -0400

----------------------------------------------------------------------
 .../transport/ws/AbstractStompSocket.java       | 130 +++++++++++++++++++
 .../transport/ws/jetty8/StompSocket.java        |  92 +------------
 .../transport/ws/jetty9/StompSocket.java        |  90 +------------
 3 files changed, 140 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eea3230c/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
new file mode 100644
index 0000000..b74bf5f
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.stomp.ProtocolConverter;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompInactivityMonitor;
+import org.apache.activemq.transport.stomp.StompTransport;
+import org.apache.activemq.transport.stomp.StompWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base implementation of a STOMP based WebSocket handler.
+ */
+public abstract class AbstractStompSocket extends TransportSupport implements StompTransport
{
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractStompSocket.class);
+
+    protected ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
+    protected StompWireFormat wireFormat = new StompWireFormat();
+    protected final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    protected final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this,
wireFormat);
+    protected volatile int receiveCounter;
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            protocolConverter.onActiveMQCommand((Command)command);
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    public void sendToActiveMQ(Command command) {
+        doConsume(command);
+    }
+
+    @Override
+    public abstract void sendToStomp(StompFrame command) throws IOException;
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        stompInactivityMonitor.stop();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
+        stompInactivityMonitor.setTransportListener(getTransportListener());
+    }
+
+    //----- Accessor methods -------------------------------------------------//
+
+    @Override
+    public StompInactivityMonitor getInactivityMonitor() {
+        return stompInactivityMonitor;
+    }
+
+    @Override
+    public StompWireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return "StompSocket_" + this.hashCode();
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    protected void processStompFrame(String data) {
+
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for StompSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!!
Should be okay, but you could see race conditions...");
+            }
+        }
+
+        try {
+            if (data != null) {
+                receiveCounter += data.length();
+
+                if (data.equals("\n")) {
+                    stompInactivityMonitor.onCommand(new KeepAliveInfo());
+                } else {
+                    protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new
ByteSequence(data.getBytes("UTF-8"))));
+                }
+            }
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/eea3230c/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
index 39cf0db..be1c8d1 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
@@ -17,20 +17,10 @@
 package org.apache.activemq.transport.ws.jetty8;
 
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
 
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.transport.TransportSupport;
-import org.apache.activemq.transport.stomp.ProtocolConverter;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
-import org.apache.activemq.transport.stomp.StompInactivityMonitor;
-import org.apache.activemq.transport.stomp.StompTransport;
-import org.apache.activemq.transport.stomp.StompWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.transport.ws.AbstractStompSocket;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,14 +28,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Implements web socket and mediates between servlet and the broker
  */
-class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport
{
+class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage {
+
     private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
 
-    Connection outbound;
-    ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
-    StompWireFormat wireFormat = new StompWireFormat();
-    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
-    private final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this,
wireFormat);
+    private Connection outbound;
 
     @Override
     public void onOpen(Connection connection) {
@@ -63,80 +50,11 @@ class StompSocket extends TransportSupport implements WebSocket.OnTextMessage,
S
 
     @Override
     public void onMessage(String data) {
-
-        if (!transportStartedAtLeastOnce()) {
-            LOG.debug("Waiting for StompSocket to be properly started...");
-            try {
-                socketTransportStarted.await();
-            } catch (InterruptedException e) {
-                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!!
Should be okay, but you could see race conditions...");
-            }
-        }
-
-
-        try {
-            if (data != null) {
-                if (data.equals("\n")) {
-                    sendToActiveMQ(new KeepAliveInfo());
-                } else {
-                    protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new
ByteSequence(data.getBytes("UTF-8"))));
-                }
-            }
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
-
-    private boolean transportStartedAtLeastOnce() {
-        return socketTransportStarted.getCount() == 0;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        socketTransportStarted.countDown();
-        stompInactivityMonitor.setTransportListener(getTransportListener());
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-    }
-
-    @Override
-    public int getReceiveCounter() {
-        return 0;
-    }
-
-    @Override
-    public String getRemoteAddress() {
-        return "StompSocket_" + this.hashCode();
-    }
-
-    @Override
-    public void oneway(Object command) throws IOException {
-        try {
-            protocolConverter.onActiveMQCommand((Command)command);
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
-
-    @Override
-    public void sendToActiveMQ(Command command) {
-        doConsume(command);
+        processStompFrame(data);
     }
 
     @Override
     public void sendToStomp(StompFrame command) throws IOException {
         outbound.sendMessage(command.format());
     }
-
-    @Override
-    public StompInactivityMonitor getInactivityMonitor() {
-        return stompInactivityMonitor;
-    }
-
-    @Override
-    public StompWireFormat getWireFormat() {
-        return this.wireFormat;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/eea3230c/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
index 8969b56..a07ccd0 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -17,20 +17,10 @@
 package org.apache.activemq.transport.ws.jetty9;
 
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
 
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.transport.TransportSupport;
-import org.apache.activemq.transport.stomp.ProtocolConverter;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
-import org.apache.activemq.transport.stomp.StompInactivityMonitor;
-import org.apache.activemq.transport.stomp.StompTransport;
-import org.apache.activemq.transport.stomp.StompWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.transport.ws.AbstractStompSocket;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketListener;
 import org.slf4j.Logger;
@@ -39,52 +29,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Implements web socket and mediates between servlet and the broker
  */
-class StompSocket extends TransportSupport implements WebSocketListener, StompTransport {
-    private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
-
-    Session session;
-    ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
-    StompWireFormat wireFormat = new StompWireFormat();
-    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
-    private final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this,
wireFormat);
-
-    private boolean transportStartedAtLeastOnce() {
-        return socketTransportStarted.getCount() == 0;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        socketTransportStarted.countDown();
-        stompInactivityMonitor.setTransportListener(getTransportListener());
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-    }
+class StompSocket extends AbstractStompSocket implements WebSocketListener {
 
-    @Override
-    public int getReceiveCounter() {
-        return 0;
-    }
-
-    @Override
-    public String getRemoteAddress() {
-        return "StompSocket_" + this.hashCode();
-    }
-
-    @Override
-    public void oneway(Object command) throws IOException {
-        try {
-            protocolConverter.onActiveMQCommand((Command)command);
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
 
-    @Override
-    public void sendToActiveMQ(Command command) {
-        doConsume(command);
-    }
+    private Session session;
 
     @Override
     public void sendToStomp(StompFrame command) throws IOException {
@@ -92,16 +41,6 @@ class StompSocket extends TransportSupport implements WebSocketListener,
StompTr
     }
 
     @Override
-    public StompInactivityMonitor getInactivityMonitor() {
-        return stompInactivityMonitor;
-    }
-
-    @Override
-    public StompWireFormat getWireFormat() {
-        return this.wireFormat;
-    }
-
-    @Override
     public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
     }
 
@@ -125,25 +64,6 @@ class StompSocket extends TransportSupport implements WebSocketListener,
StompTr
 
     @Override
     public void onWebSocketText(String data) {
-        if (!transportStartedAtLeastOnce()) {
-            LOG.debug("Waiting for StompSocket to be properly started...");
-            try {
-                socketTransportStarted.await();
-            } catch (InterruptedException e) {
-                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!!
Should be okay, but you could see race conditions...");
-            }
-        }
-
-        try {
-            if (data != null) {
-                if (data.equals("\n")) {
-                    sendToActiveMQ(new KeepAliveInfo());
-                } else {
-                    protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new
ByteSequence(data.getBytes("UTF-8"))));
-                }
-            }
-        } catch (Exception e) {
-            onException(IOExceptionSupport.create(e));
-        }
+        processStompFrame(data);
     }
 }


Mime
View raw message