activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6044
Date Mon, 23 Nov 2015 21:59:08 GMT
Repository: activemq
Updated Branches:
  refs/heads/master bc9edf00d -> d9e22a936


https://issues.apache.org/jira/browse/AMQ-6044

Clean up some test client code.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d9e22a93
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d9e22a93
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d9e22a93

Branch: refs/heads/master
Commit: d9e22a9368cdaa67501374a41e9ccafc2629f488
Parents: bc9edf0
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Nov 23 16:55:11 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Nov 23 16:55:11 2015 -0500

----------------------------------------------------------------------
 .../amqp/client/AmqpAbstractResource.java       | 40 +---------
 .../transport/amqp/client/AmqpConnection.java   | 38 +++++-----
 .../transport/amqp/client/AmqpEventSink.java    | 79 ++++++++++++++++++++
 .../transport/amqp/client/AmqpResource.java     | 73 +-----------------
 4 files changed, 102 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d9e22a93/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 767f792..9d02027 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -19,9 +19,6 @@ package org.apache.activemq.transport.amqp.client;
 import java.io.IOException;
 
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.slf4j.Logger;
@@ -142,10 +139,7 @@ public abstract class AmqpAbstractResource<E extends Endpoint>
implements AmqpRe
 
     @Override
     public void remotelyClosed(AmqpConnection connection) {
-        Exception error = getRemoteError();
-        if (error == null) {
-            error = new IOException("Remote has closed without error information");
-        }
+        Exception error = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
 
         if (endpoint != null) {
             // TODO: if this is a producer/consumer link then we may only be detached,
@@ -192,41 +186,11 @@ public abstract class AmqpAbstractResource<E extends Endpoint>
implements AmqpRe
         return getEndpoint().getRemoteState();
     }
 
-    @Override
     public boolean hasRemoteError() {
         return getEndpoint().getRemoteCondition().getCondition() != null;
     }
 
     @Override
-    public Exception getRemoteError() {
-        String message = getRemoteErrorMessage();
-        Exception remoteError = null;
-        Symbol error = getEndpoint().getRemoteCondition().getCondition();
-        if (error != null) {
-            if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
-                remoteError = new SecurityException(message);
-            } else {
-                remoteError = new Exception(message);
-            }
-        }
-
-        return remoteError;
-    }
-
-    @Override
-    public String getRemoteErrorMessage() {
-        String message = "Received unkown error from remote peer";
-        if (getEndpoint().getRemoteCondition() != null) {
-            ErrorCondition error = getEndpoint().getRemoteCondition();
-            if (error.getDescription() != null && !error.getDescription().isEmpty())
{
-                message = error.getDescription();
-            }
-        }
-
-        return message;
-    }
-
-    @Override
     public void processRemoteOpen(AmqpConnection connection) throws IOException {
         doOpenInspection();
         doOpenCompletion();
@@ -254,7 +218,7 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements
AmqpRe
             LOG.warn("Open of {} failed: ", this);
             Exception openError;
             if (hasRemoteError()) {
-                openError = getRemoteError();
+                openError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
             } else {
                 openError = getOpenAbortException();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d9e22a93/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 523fa2a..6c35e4c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -594,43 +594,43 @@ public class AmqpConnection extends AmqpAbstractResource<Connection>
implements
                     LOG.trace("New Proton Event: {}", protonEvent.getType());
                 }
 
-                AmqpResource amqpResource = null;
+                AmqpEventSink amqpEventSink = null;
                 switch (protonEvent.getType()) {
                     case CONNECTION_REMOTE_CLOSE:
-                        amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
-                        amqpResource.processRemoteClose(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
+                        amqpEventSink.processRemoteClose(this);
                         break;
                     case CONNECTION_REMOTE_OPEN:
-                        amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
-                        amqpResource.processRemoteOpen(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
+                        amqpEventSink.processRemoteOpen(this);
                         break;
                     case SESSION_REMOTE_CLOSE:
-                        amqpResource = (AmqpSession) protonEvent.getSession().getContext();
-                        amqpResource.processRemoteClose(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
+                        amqpEventSink.processRemoteClose(this);
                         break;
                     case SESSION_REMOTE_OPEN:
-                        amqpResource = (AmqpSession) protonEvent.getSession().getContext();
-                        amqpResource.processRemoteOpen(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
+                        amqpEventSink.processRemoteOpen(this);
                         break;
                     case LINK_REMOTE_CLOSE:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processRemoteClose(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processRemoteClose(this);
                         break;
                     case LINK_REMOTE_DETACH:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processRemoteDetach(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processRemoteDetach(this);
                         break;
                     case LINK_REMOTE_OPEN:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processRemoteOpen(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processRemoteOpen(this);
                         break;
                     case LINK_FLOW:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processFlowUpdates(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processFlowUpdates(this);
                         break;
                     case DELIVERY:
-                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
-                        amqpResource.processDeliveryUpdates(this);
+                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
+                        amqpEventSink.processDeliveryUpdates(this);
                         break;
                     default:
                         break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/d9e22a93/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
new file mode 100644
index 0000000..f83e5d1
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+/**
+ * Interface used by classes that want to process AMQP events sent from
+ * the transport layer.
+ */
+public interface AmqpEventSink {
+
+    /**
+     * Event handler for remote peer open of this resource.
+     *
+     * @param connection
+     *        the AmqpConnection instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteOpen(AmqpConnection connection) throws IOException;
+
+    /**
+     * Event handler for remote peer detach of this resource.
+     *
+     * @param connection
+     *        the AmqpConnection instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteDetach(AmqpConnection connection) throws IOException;
+
+    /**
+     * Event handler for remote peer close of this resource.
+     *
+     * @param connection
+     *        the AmqpConnection instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteClose(AmqpConnection connection) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Delivery related event has been triggered
+     * for the given endpoint.
+     *
+     * @param connection
+     *        the AmqpConnection instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processDeliveryUpdates(AmqpConnection connection) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Flow related event has been triggered
+     * for the given endpoint.
+     *
+     * @param connection
+     *        the AmqpConnection instance for easier access to fire events.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processFlowUpdates(AmqpConnection connection) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d9e22a93/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
index f20fd7c..af68c2d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
-import java.io.IOException;
-
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 
 /**
@@ -26,7 +24,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult;
  * All AMQP types should implement this interface to allow for control of state
  * and configuration details.
  */
-public interface AmqpResource {
+public interface AmqpResource extends AmqpEventSink {
 
     /**
      * Perform all the work needed to open this resource and store the request
@@ -102,71 +100,4 @@ public interface AmqpResource {
      */
     void failed(Exception cause);
 
-    /**
-     * Event handler for remote peer open of this resource.
-     *
-     * @param connection
-     *        The connection that owns this resource.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processRemoteOpen(AmqpConnection connection) throws IOException;
-
-    /**
-     * Event handler for remote peer detach of this resource.
-     *
-     * @param connection
-     *        The connection that owns this resource.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processRemoteDetach(AmqpConnection connection) throws IOException;
-
-    /**
-     * Event handler for remote peer close of this resource.
-     *
-     * @param connection
-     *        The connection that owns this resource.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processRemoteClose(AmqpConnection connection) throws IOException;
-
-    /**
-     * Called when the Proton Engine signals an Delivery related event has been triggered
-     * for the given endpoint.
-     *
-     * @param connection
-     *        The connection that owns this resource.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processDeliveryUpdates(AmqpConnection connection) throws IOException;
-
-    /**
-     * Called when the Proton Engine signals an Flow related event has been triggered
-     * for the given endpoint.
-     *
-     * @param connection
-     *        The connection that owns this resource.
-     *
-     * @throws IOException if an error occurs while processing the update.
-     */
-    void processFlowUpdates(AmqpConnection connection) throws IOException;
-
-    /**
-     * @returns true if the remote end has sent an error
-     */
-    boolean hasRemoteError();
-
-    /**
-     * @return an Exception derived from the error state of the endpoint's Remote Condition.
-     */
-    Exception getRemoteError();
-
-    /**
-     * @return an Error message derived from the error state of the endpoint's Remote Condition.
-     */
-    String getRemoteErrorMessage();
-
 }


Mime
View raw message