qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-395 Add failed state in provider to prevent possible race
Date Tue, 19 Jun 2018 19:03:36 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master e676248c7 -> 4bfe7f5ae


QPIDJMS-395 Add failed state in provider to prevent possible race

Adds a failed state to the AMQP provider such that any queued actions
such as a send which race into the provider between the time a
connection error is detected and the failover bits close the provider
received a consistent exception that allows normal failover processing
to occur.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4bfe7f5a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4bfe7f5a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4bfe7f5a

Branch: refs/heads/master
Commit: 4bfe7f5ae60680d220dbe2b7cbcfeb824b5e6e9d
Parents: e676248
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jun 19 15:03:15 2018 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jun 19 15:03:15 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  5 +-
 .../jms/provider/ProviderFailedException.java   | 32 ++++++++++
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 62 +++++++++++---------
 3 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4bfe7f5a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 338ad5d..52c314d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -78,6 +78,7 @@ import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderFailedException;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderSynchronization;
@@ -229,8 +230,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                             }
                             LOG.debug("Failed destroying Connection resource: {}", ex.getMessage());
                         }
-                    } catch(ProviderClosedException pce) {
-                        LOG.debug("Ignoring provider closed exception during connection close");
+                    } catch(ProviderClosedException | ProviderFailedException pfe) {
+                        LOG.debug("Ignoring provider exception during connection close");
                     } finally {
                         requests.remove(request);
                     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4bfe7f5a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
new file mode 100644
index 0000000..702dbb4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFailedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+
+public class ProviderFailedException extends IOException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ProviderFailedException(String message) {
+        super(message);
+    }
+
+    public ProviderFailedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4bfe7f5a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 46c95ab..7153bbe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -58,6 +58,7 @@ import org.apache.qpid.jms.provider.NoOpAsyncResult;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderFailedException;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder;
@@ -135,6 +136,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     private final URI remoteURI;
     private final AtomicBoolean closed = new AtomicBoolean();
+    private volatile Throwable failureCause;
     private ScheduledThreadPoolExecutor serializer;
     private final org.apache.qpid.proton.engine.Transport protonTransport =
         org.apache.qpid.proton.engine.Transport.Factory.create();
@@ -166,7 +168,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void connect(final JmsConnectionInfo connectionInfo) throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
 
         final ProviderFuture connectRequest = new ProviderFuture();
 
@@ -288,7 +290,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void start() throws IOException, IllegalStateException {
-        checkClosed();
+        checkClosedOrFailed();
 
         if (listener == null) {
             throw new IllegalStateException("No ProviderListener registered.");
@@ -381,13 +383,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void create(final JmsResource resource, final AsyncResult request) throws IOException,
JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     resource.visit(new JmsResourceVistor() {
                         @Override
                         public void processSessionInfo(JmsSessionInfo sessionInfo) throws
Exception {
@@ -465,13 +467,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void start(final JmsResource resource, final AsyncResult request) throws IOException
{
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     resource.visit(new JmsDefaultResourceVisitor() {
 
                         @Override
@@ -492,13 +494,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void stop(final JmsResource resource, final AsyncResult request) throws IOException
{
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     resource.visit(new JmsDefaultResourceVisitor() {
 
                         @Override
@@ -519,13 +521,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void destroy(final JmsResource resource, final AsyncResult request) throws IOException
{
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     resource.visit(new JmsDefaultResourceVisitor() {
 
                         @Override
@@ -621,13 +623,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void send(final JmsOutboundMessageDispatch envelope, final AsyncResult request)
throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
 
                     JmsProducerId producerId = envelope.getProducerId();
                     AmqpProducer producer = null;
@@ -649,13 +651,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, final AsyncResult
request) throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     AmqpSession amqpSession = connection.getSession(sessionId);
                     amqpSession.acknowledge(ackType);
                     pumpToProtonTransport(request);
@@ -669,13 +671,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void acknowledge(final JmsInboundMessageDispatch envelope, final ACK_TYPE ackType,
final AsyncResult request) throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
 
                     JmsConsumerId consumerId = envelope.getConsumerId();
                     AmqpConsumer consumer = null;
@@ -705,13 +707,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId,
final AsyncResult request) throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     AmqpSession session = connection.getSession(transactionInfo.getSessionId());
                     session.commit(transactionInfo, nextTransactionId, request);
                     pumpToProtonTransport(request);
@@ -724,13 +726,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void rollback(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId,
final AsyncResult request) throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     AmqpSession session = connection.getSession(transactionInfo.getSessionId());
                     session.rollback(transactionInfo, nextTransactionId, request);
                     pumpToProtonTransport(request);
@@ -743,13 +745,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException
{
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     AmqpSession session = connection.getSession(sessionId);
                     session.recover();
                     pumpToProtonTransport(request);
@@ -763,13 +765,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void unsubscribe(final String subscription, final AsyncResult request) throws
IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     connection.unsubscribe(subscription, request);
                     pumpToProtonTransport(request);
                 } catch (Throwable t) {
@@ -781,13 +783,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult
request) throws IOException {
-        checkClosed();
+        checkClosedOrFailed();
         serializer.execute(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    checkClosed();
+                    checkClosedOrFailed();
                     AmqpConsumer consumer = null;
 
                     if (consumerId.getProviderHint() instanceof AmqpConsumer) {
@@ -1103,6 +1105,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             nextIdleTimeoutCheck = null;
         }
 
+        failureCause = ex;
+
         ProviderListener listener = this.listener;
         if (listener != null) {
             listener.onConnectionFailure(IOExceptionSupport.create(ex));
@@ -1438,10 +1442,14 @@ public class AmqpProvider implements Provider, TransportListener ,
AmqpResourceP
 
     //----- Internal implementation ------------------------------------------//
 
-    private void checkClosed() throws ProviderClosedException {
+    private void checkClosedOrFailed() throws ProviderClosedException, ProviderFailedException
{
         if (closed.get()) {
             throw new ProviderClosedException("This Provider is already closed");
         }
+
+        if (failureCause != null) {
+            throw new ProviderFailedException("The Provider has failed", failureCause);
+        }
     }
 
     private Mechanism findSaslMechanism(String[] remoteMechanisms) throws JMSSecurityRuntimeException
{


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message