qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [5/7] qpid-jms git commit: encapsulate the endpoint field
Date Mon, 10 Nov 2014 17:54:15 GMT
encapsulate the endpoint field


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f2b3a5da
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f2b3a5da
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f2b3a5da

Branch: refs/heads/master
Commit: f2b3a5da8217e7dcd228ae8ee7ce32253b0780e4
Parents: da94884
Author: Robert Gemmell <robbie@apache.org>
Authored: Mon Nov 10 15:21:03 2014 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Mon Nov 10 17:48:37 2014 +0000

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpAbstractResource.java | 42 +++++++++++---------
 .../qpid/jms/provider/amqp/AmqpConnection.java  | 16 ++++----
 .../provider/amqp/AmqpConnectionSession.java    |  7 ++--
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 39 +++++++++---------
 .../jms/provider/amqp/AmqpFixedProducer.java    | 32 ++++++++-------
 .../jms/provider/amqp/AmqpQueueBrowser.java     | 20 +++++-----
 .../qpid/jms/provider/amqp/AmqpSession.java     |  4 +-
 .../provider/amqp/AmqpTemporaryDestination.java | 19 +++++----
 .../provider/amqp/AmqpTransactionContext.java   | 25 +++++++-----
 9 files changed, 112 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 6def062..933683a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -45,7 +45,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends
Endp
     protected AsyncResult openRequest;
     protected AsyncResult closeRequest;
 
-    protected E endpoint;
+    private E endpoint;
     protected R resource;
 
     /**
@@ -68,19 +68,19 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
      */
     public AmqpAbstractResource(R resource, E endpoint) {
         this.resource = resource;
-        this.endpoint = endpoint;
+        setEndpoint(endpoint);
     }
 
     @Override
     public void open(AsyncResult request) {
         this.openRequest = request;
         doOpen();
-        this.endpoint.setContext(this);
+        getEndpoint().setContext(this);
     }
 
     @Override
     public boolean isOpen() {
-        return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
+        return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
     }
 
     @Override
@@ -99,7 +99,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends
Endp
     @Override
     public void close(AsyncResult request) {
         // If already closed signal success or else the caller might never get notified.
-        if (endpoint.getLocalState() == EndpointState.CLOSED) {
+        if (getEndpoint().getLocalState() == EndpointState.CLOSED) {
             request.onSuccess();
             return;
         }
@@ -110,7 +110,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
 
     @Override
     public boolean isClosed() {
-        return this.endpoint.getLocalState() == EndpointState.CLOSED;
+        return getEndpoint().getLocalState() == EndpointState.CLOSED;
     }
 
     @Override
@@ -120,8 +120,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
 
     @Override
     public void closed() {
-        this.endpoint.close();
-        this.endpoint.free();
+        getEndpoint().close();
+        getEndpoint().free();
 
         if (this.closeRequest != null) {
             this.closeRequest.onSuccess();
@@ -166,34 +166,38 @@ public abstract class AmqpAbstractResource<R extends JmsResource,
E extends Endp
         return this.endpoint;
     }
 
+    public void setEndpoint(E endpoint) {
+        this.endpoint = endpoint;
+    }
+
     public R getJmsResource() {
         return this.resource;
     }
 
     public EndpointState getLocalState() {
-        if (endpoint == null) {
+        if (getEndpoint() == null) {
             return EndpointState.UNINITIALIZED;
         }
-        return this.endpoint.getLocalState();
+        return getEndpoint().getLocalState();
     }
 
     public EndpointState getRemoteState() {
-        if (endpoint == null) {
+        if (getEndpoint() == null) {
             return EndpointState.UNINITIALIZED;
         }
-        return this.endpoint.getRemoteState();
+        return getEndpoint().getRemoteState();
     }
 
     @Override
     public boolean hasRemoteError() {
-        return endpoint.getRemoteCondition().getCondition() != null;
+        return getEndpoint().getRemoteCondition().getCondition() != null;
     }
 
     @Override
     public Exception getRemoteError() {
         String message = getRemoteErrorMessage();
         Exception remoteError = null;
-        Symbol error = endpoint.getRemoteCondition().getCondition();
+        Symbol error = getEndpoint().getRemoteCondition().getCondition();
         if (error != null) {
             if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
                 remoteError = new JMSSecurityException(message);
@@ -208,8 +212,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
     @Override
     public String getRemoteErrorMessage() {
         String message = "Received unkown error from remote peer";
-        if (endpoint.getRemoteCondition() != null) {
-            ErrorCondition error = endpoint.getRemoteCondition();
+        if (getEndpoint().getRemoteCondition() != null) {
+            ErrorCondition error = getEndpoint().getRemoteCondition();
             if (error.getDescription() != null && !error.getDescription().isEmpty())
{
                 message = error.getDescription();
             }
@@ -220,7 +224,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
 
     @Override
     public void processStateChange() throws IOException {
-        EndpointState remoteState = endpoint.getRemoteState();
+        EndpointState remoteState = getEndpoint().getRemoteState();
 
         if (remoteState == EndpointState.ACTIVE) {
             if (isAwaitingOpen()) {
@@ -258,7 +262,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
      * updates.
      */
     protected void doOpen() {
-        endpoint.open();
+        getEndpoint().open();
     }
 
     /**
@@ -267,6 +271,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
      * standard close path such as endpoint detach etc.
      */
     protected void doClose() {
-        endpoint.close();
+        getEndpoint().close();
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 8d5a458..c500d74 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -94,8 +94,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo,
Conn
 
     @Override
     protected void doOpen() {
-        this.endpoint.setContainer(resource.getClientId());
-        this.endpoint.setHostname(remoteURI.getHost());
+        getEndpoint().setContainer(resource.getClientId());
+        getEndpoint().setHostname(remoteURI.getHost());
         super.doOpen();
     }
 
@@ -132,7 +132,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo,
Conn
             connected = true;
 
             this.properties = new AmqpConnectionProperties(
-                endpoint.getRemoteOfferedCapabilities(), endpoint.getRemoteProperties());
+                getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
 
             connectionSession.open(new AsyncResult() {
 
@@ -155,14 +155,14 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo,
Conn
             });
         }
 
-        EndpointState localState = endpoint.getLocalState();
-        EndpointState remoteState = endpoint.getRemoteState();
+        EndpointState localState = getEndpoint().getLocalState();
+        EndpointState remoteState = getEndpoint().getRemoteState();
 
         // We are still active (connected or not) and something on the remote end has
         // closed us, signal an error if one was sent.
         if (localState == EndpointState.ACTIVE && remoteState != EndpointState.ACTIVE)
{
-            if (endpoint.getRemoteCondition().getCondition() != null) {
-                LOG.info("Error condition detected on Connection open {}.", endpoint.getRemoteCondition().getCondition());
+            if (getEndpoint().getRemoteCondition().getCondition() != null) {
+                LOG.info("Error condition detected on Connection open {}.", getEndpoint().getRemoteCondition().getCondition());
                 Exception remoteError = getRemoteError();
                 if (isAwaitingOpen()) {
                     openRequest.onFailure(remoteError);
@@ -214,7 +214,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo,
Conn
     }
 
     public Connection getProtonConnection() {
-        return this.endpoint;
+        return this.getEndpoint();
     }
 
     public URI getRemoteURI() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 3cfc0c6..e08d1b9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -81,9 +81,10 @@ public class AmqpConnectionSession extends AmqpSession {
 
         @Override
         protected void doOpen() {
-            endpoint.setTarget(new Target());
-            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-            endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+            Receiver receiver = getEndpoint();
+            receiver.setTarget(new Target());
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
             super.doOpen();
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index cb766bf..1a9be73 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -88,7 +88,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
      * Starts the consumer by setting the link credit to the given prefetch value.
      */
     public void start(AsyncResult request) {
-        this.endpoint.flow(resource.getPrefetchSize());
+        getEndpoint().flow(resource.getPrefetchSize());
         request.onSuccess();
     }
 
@@ -110,15 +110,18 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             receiverName = resource.getSubscriptionName();
         }
 
-        endpoint = session.getProtonSession().receiver(receiverName);
-        endpoint.setSource(source);
-        endpoint.setTarget(target);
+        Receiver receiver = session.getProtonSession().receiver(receiverName);
+        receiver.setSource(source);
+        receiver.setTarget(target);
         if (isPresettle()) {
-            endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
+            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
         } else {
-            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         }
-        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(receiver);
+
         super.doOpen();
     }
 
@@ -251,9 +254,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             return;
         }
 
-        int currentCredit = endpoint.getCredit();
+        int currentCredit = getEndpoint().getCredit();
         if (currentCredit <= resource.getPrefetchSize() * 0.2) {
-            endpoint.flow(resource.getPrefetchSize() - currentCredit);
+            getEndpoint().flow(resource.getPrefetchSize() - currentCredit);
         }
     }
 
@@ -282,9 +285,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
      * @param timeout
      */
     public void pull(long timeout) {
-        if (resource.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
+        if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
             // expand the credit window by one.
-            endpoint.flow(1);
+            getEndpoint().flow(1);
         }
     }
 
@@ -292,7 +295,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     public void processDeliveryUpdates() throws IOException {
         Delivery incoming = null;
         do {
-            incoming = endpoint.current();
+            incoming = getEndpoint().current();
             if (incoming != null && incoming.isReadable() && !incoming.isPartial())
{
                 LOG.trace("{} has incoming Message(s).", this);
                 try {
@@ -300,7 +303,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
                 } catch (Exception e) {
                     throw IOExceptionSupport.create(e);
                 }
-                endpoint.advance();
+                getEndpoint().advance();
             } else {
                 LOG.trace("{} has a partial incoming Message(s), deferring.", this);
                 incoming = null;
@@ -348,9 +351,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     @Override
     protected void doClose() {
         if (resource.isDurable()) {
-            this.endpoint.detach();
+            getEndpoint().detach();
         } else {
-            this.endpoint.close();
+            getEndpoint().close();
         }
     }
 
@@ -371,7 +374,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     }
 
     public Receiver getProtonReceiver() {
-        return this.endpoint;
+        return this.getEndpoint();
     }
 
     public boolean isBrowser() {
@@ -398,7 +401,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         incoming.disposition(disposition);
         incoming.settle();
         if (expandCredit) {
-            endpoint.flow(1);
+            getEndpoint().flow(1);
         }
     }
 
@@ -420,7 +423,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     protected Message decodeIncomingMessage(Delivery incoming) {
         int count;
 
-        while ((count = endpoint.recv(incomingBuffer.array(), incomingBuffer.writerIndex(),
incomingBuffer.writableBytes())) > 0) {
+        while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(),
incomingBuffer.writableBytes())) > 0) {
             incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
             if (!incomingBuffer.isWritable()) {
                 incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 7eb6683..8cc5a74 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -86,7 +86,7 @@ public class AmqpFixedProducer extends AmqpProducer {
         // TODO - Handle the case where remote has no credit which means we can't send to
it.
         //        We need to hold the send until remote credit becomes available but we should
         //        also have a send timeout option and filter timed out sends.
-        if (endpoint.getCredit() <= 0) {
+        if (getEndpoint().getCredit() <= 0) {
             LOG.trace("Holding Message send until credit is available.");
             // Once a message goes into a held mode we no longer can send it async, so
             // we clear the async flag if set to avoid the sender never getting notified.
@@ -108,9 +108,9 @@ public class AmqpFixedProducer extends AmqpProducer {
         Delivery delivery = null;
 
         if (presettle) {
-            delivery = endpoint.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+            delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
         } else {
-            delivery = endpoint.delivery(tag, 0, tag.length);
+            delivery = getEndpoint().delivery(tag, 0, tag.length);
         }
 
         delivery.setContext(request);
@@ -129,7 +129,7 @@ public class AmqpFixedProducer extends AmqpProducer {
             delivery.settle();
         } else {
             pending.add(delivery);
-            endpoint.advance();
+            getEndpoint().advance();
         }
 
         if (envelope.isSendAsync() || presettle) {
@@ -152,7 +152,7 @@ public class AmqpFixedProducer extends AmqpProducer {
         int sentSoFar = 0;
 
         while (true) {
-            int sent = endpoint.send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
+            int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
             if (sent > 0) {
                 sentSoFar += sent;
                 if ((encodedSize - sentSoFar) == 0) {
@@ -166,8 +166,8 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     @Override
     public void processFlowUpdates() throws IOException {
-        if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) {
-            while (endpoint.getCredit() > 0 && !pendingSends.isEmpty()) {
+        if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) {
+            while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) {
                 LOG.trace("Dispatching previously held send");
                 PendingSend held = pendingSends.pop();
                 try {
@@ -248,15 +248,19 @@ public class AmqpFixedProducer extends AmqpProducer {
         target.setAddress(targetAddress);
 
         String senderName = sourceAddress + ":" + targetAddress;
-        endpoint = session.getProtonSession().sender(senderName);
-        endpoint.setSource(source);
-        endpoint.setTarget(target);
+
+        Sender sender = session.getProtonSession().sender(senderName);
+        sender.setSource(source);
+        sender.setTarget(target);
         if (presettle) {
-            endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
+            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
         } else {
-            endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         }
-        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(sender);
+
         super.doOpen();
     }
 
@@ -265,7 +269,7 @@ public class AmqpFixedProducer extends AmqpProducer {
     }
 
     public Sender getProtonSender() {
-        return this.endpoint;
+        return this.getEndpoint();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index 6434bbd..5daeb02 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -46,7 +46,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
      */
     @Override
     public void start(AsyncResult request) {
-        this.endpoint.flow(resource.getPrefetchSize());
+        getEndpoint().flow(resource.getPrefetchSize());
         request.onSuccess();
     }
 
@@ -64,17 +64,17 @@ public class AmqpQueueBrowser extends AmqpConsumer {
      */
     @Override
     public void pull(long timeout) {
-        if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled()
== 0) {
+        if (!getEndpoint().getDrain() && getEndpoint().current() == null &&
getEndpoint().getUnsettled() == 0) {
             LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
-            this.endpoint.drain(resource.getPrefetchSize());
+            getEndpoint().drain(resource.getPrefetchSize());
         } else {
-            endpoint.setDrain(false);
+            getEndpoint().setDrain(false);
         }
     }
 
     @Override
     public void processFlowUpdates() throws IOException {
-        if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit())
{
+        if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit())
{
             JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
             browseDone.setConsumerId(getConsumerId());
             try {
@@ -83,7 +83,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
                 throw IOExceptionSupport.create(e);
             }
         } else {
-            endpoint.setDrain(false);
+            getEndpoint().setDrain(false);
         }
 
         super.processFlowUpdates();
@@ -91,14 +91,14 @@ public class AmqpQueueBrowser extends AmqpConsumer {
 
     @Override
     public void processDeliveryUpdates() throws IOException {
-        if (endpoint.getDrain() && endpoint.current() != null) {
+        if (getEndpoint().getDrain() && getEndpoint().current() != null) {
             LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
-            endpoint.setDrain(false);
+            getEndpoint().setDrain(false);
         }
 
         super.processDeliveryUpdates();
 
-        if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit())
{
+        if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit())
{
             JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
             browseDone.setConsumerId(getConsumerId());
             try {
@@ -107,7 +107,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
                 throw IOExceptionSupport.create(e);
             }
         } else {
-            endpoint.setDrain(false);
+            getEndpoint().setDrain(false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 5f7ad07..3ffd792 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -67,7 +67,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo,
Session> {
 
     @Override
     protected void doOpen() {
-        this.endpoint.setIncomingCapacity(Integer.MAX_VALUE);
+        this.getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
         this.connection.addSession(this);
         super.doOpen();
     }
@@ -292,7 +292,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo,
Session> {
     }
 
     public Session getProtonSession() {
-        return this.endpoint;
+        return this.getEndpoint();
     }
 
     boolean isTransacted() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 2cf1cb9..fca448d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -58,7 +58,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
         // TODO - We might want to check on our producer to see if it becomes closed
         //        which might indicate that the broker purged the temporary destination.
 
-        EndpointState remoteState = endpoint.getRemoteState();
+        EndpointState remoteState = getEndpoint().getRemoteState();
         if (remoteState == EndpointState.ACTIVE) {
             LOG.trace("Temporary Destination: {} is now open", this.resource);
             opened();
@@ -73,7 +73,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
 
         // Once our producer is opened we can read the updated name from the target address.
         String oldDestinationName = resource.getName();
-        String destinationName = this.endpoint.getRemoteTarget().getAddress();
+        String destinationName = getEndpoint().getRemoteTarget().getAddress();
 
         this.resource.setName(destinationName);
 
@@ -100,11 +100,14 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
         target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
 
         String senderName = sourceAddress;
-        endpoint = session.getProtonSession().sender(senderName);
-        endpoint.setSource(source);
-        endpoint.setTarget(target);
-        endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        Sender sender = session.getProtonSession().sender(senderName);
+        sender.setSource(source);
+        sender.setTarget(target);
+        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(sender);
 
         this.connection.addTemporaryDestination(this);
 
@@ -127,7 +130,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio
     }
 
     public Sender getProtonSender() {
-        return this.endpoint;
+        return getEndpoint();
     }
 
     public JmsDestination getJmsDestination() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 5492976..a138c97 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -135,11 +135,15 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         Source source = new Source();
 
         String coordinatorName = resource.getSessionId().toString();
-        endpoint = session.getProtonSession().sender(coordinatorName);
-        endpoint.setSource(source);
-        endpoint.setTarget(coordinator);
-        endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        Sender sender = session.getProtonSession().sender(coordinatorName);
+        sender.setSource(source);
+        sender.setTarget(coordinator);
+        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(sender);
+
         super.doOpen();
     }
 
@@ -152,7 +156,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         Declare declare = new Declare();
         message.setBody(new AmqpValue(declare));
 
-        pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
         pendingRequest = request;
         current = txId;
 
@@ -172,7 +176,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         discharge.setTxnId((Binary) current.getProviderHint());
         message.setBody(new AmqpValue(discharge));
 
-        pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
         pendingDelivery.setContext(COMMIT_MARKER);
         pendingRequest = request;
 
@@ -192,7 +196,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
         discharge.setTxnId((Binary) current.getProviderHint());
         message.setBody(new AmqpValue(discharge));
 
-        pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+        pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
         pendingDelivery.setContext(ROLLBACK_MARKER);
         pendingRequest = request;
 
@@ -260,7 +264,8 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
             }
         }
 
-        this.endpoint.send(buffer, 0, encodedSize);
-        this.endpoint.advance();
+        Sender sender = getEndpoint();
+        sender.send(buffer, 0, encodedSize);
+        sender.advance();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message