qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [1/2] qpid-jms git commit: QPIDJMS-45: add support for sending empty frames to conform to remotes requested idle-timeout value
Date Wed, 06 May 2015 16:29:32 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 8bcad7f2f -> cc4921293


QPIDJMS-45: add support for sending empty frames to conform to remotes requested idle-timeout
value


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/31e78722
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/31e78722
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/31e78722

Branch: refs/heads/master
Commit: 31e78722f26f9eeec6cac9fd4a88ff937436c4e6
Parents: 8bcad7f
Author: Robert Gemmell <robbie@apache.org>
Authored: Wed May 6 12:46:55 2015 +0100
Committer: Robert Gemmell <robbie@apache.org>
Committed: Wed May 6 17:13:56 2015 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 42 ++++++++++++++++++++
 1 file changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/31e78722/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 2314269..f697541 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,6 +60,7 @@ import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Sasl;
@@ -115,6 +117,8 @@ public class AmqpProvider implements Provider, TransportListener {
     private final Transport protonTransport = Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
 
+    private ScheduledFuture<?> nextIdleTimeoutCheck;
+
     /**
      * Create a new instance of an AmqpProvider bonded to the given remote URI.
      *
@@ -206,6 +210,11 @@ public class AmqpProvider implements Provider, TransportListener {
                     }
                 }
 
+                if (nextIdleTimeoutCheck != null) {
+                    LOG.trace("Cancelling IdleTimeoutCheck");
+                    nextIdleTimeoutCheck.cancel(false);
+                    nextIdleTimeoutCheck = null;
+                }
                 serializer.shutdown();
             }
         }
@@ -796,6 +805,12 @@ public class AmqpProvider implements Provider, TransportListener {
     }
 
     void fireConnectionEstablished() {
+        int remoteIdleTimeout = protonTransport.getRemoteIdleTimeout();
+        if(remoteIdleTimeout > 0){
+            LOG.trace("IdleTimeoutCheck being initiated");
+            nextIdleTimeoutCheck = serializer.schedule(new IdleTimeoutCheck(), remoteIdleTimeout
/ 2, TimeUnit.MILLISECONDS);
+        }
+
         ProviderListener listener = this.listener;
         if (listener != null) {
             listener.onConnectionEstablished(remoteURI);
@@ -928,4 +943,31 @@ public class AmqpProvider implements Provider, TransportListener {
     public URI getRemoteURI() {
         return remoteURI;
     }
+
+    private final class IdleTimeoutCheck implements Runnable {
+        @Override
+        public void run() {
+            boolean doCheck = connection.getLocalState() == EndpointState.ACTIVE;
+
+            if (doCheck) {
+                long now = System.currentTimeMillis();
+                long deadline = protonTransport.tick(now);
+
+                pumpToProtonTransport(NOOP_REQUEST);
+                if (deadline > 0) {
+                    long delay = deadline - now;
+
+                    LOG.trace("IdleTimeoutCheck rescheduling with delay: {}", delay);
+                    nextIdleTimeoutCheck = serializer.schedule(this, delay, TimeUnit.MILLISECONDS);
+                } else {
+                    doCheck = false;
+                }
+            }
+
+            if(!doCheck) {
+                nextIdleTimeoutCheck = null;
+                LOG.trace("IdleTimeoutCheck exiting");
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message