cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ff...@apache.org
Subject cxf git commit: [CXF-7109] ClientCallback may be invoked twice when Async HTTP Transport is used
Date Wed, 26 Oct 2016 06:04:37 GMT
Repository: cxf
Updated Branches:
  refs/heads/3.1.x-fixes 4537bb27c -> 2e2bfe121


[CXF-7109] ClientCallback may be invoked twice when Async HTTP Transport is used

(cherry picked from commit 4eb81d3044c0f663d580cdbd3b611d5e3b1b4ac5)


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2e2bfe12
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2e2bfe12
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2e2bfe12

Branch: refs/heads/3.1.x-fixes
Commit: 2e2bfe1211b521765458401eb49101a2857036ff
Parents: 4537bb2
Author: Tadayoshi Sato <sato.tadayoshi@gmail.com>
Authored: Tue Oct 25 18:45:42 2016 +0900
Committer: Freeman Fang <freeman.fang@gmail.com>
Committed: Wed Oct 26 11:35:06 2016 +0800

----------------------------------------------------------------------
 .../org/apache/cxf/endpoint/ClientImpl.java     | 14 ++++++++++---
 .../cxf/interceptor/ClientOutFaultObserver.java |  3 ++-
 .../cxf/message/AbstractWrappedMessage.java     |  3 +++
 .../org/apache/cxf/message/ExchangeImpl.java    |  4 ++++
 .../java/org/apache/cxf/message/StringMap.java  |  7 +++++++
 .../org/apache/cxf/message/StringMapImpl.java   |  4 ++++
 .../http/asyncclient/AsyncHTTPConduitTest.java  | 21 ++++++++++++++++++++
 .../apache/cxf/transport/http/HTTPConduit.java  | 11 +++++-----
 8 files changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
index 428f794..848d3bc 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
@@ -791,8 +791,11 @@ public class ClientImpl
                         if (resCtx != null) {
                             responseContext.put(Thread.currentThread(), resCtx);
                         }
-                        callback.handleException(resCtx, error);
-
+                        // remove callback so that it won't be invoked twice
+                        callback = message.getExchange().remove(ClientCallback.class);
+                        if (callback != null) {
+                            callback.handleException(resCtx, error);
+                        }
                     }
                 } else {
                     chain.doIntercept(message);
@@ -801,8 +804,13 @@ public class ClientImpl
             }
 
             callback = message.getExchange().get(ClientCallback.class);
+            if (callback == null || isPartialResponse(message)) {
+                return;
+            }
 
-            if (callback != null && !isPartialResponse(message)) {
+            // remove callback so that it won't be invoked twice
+            callback = message.getExchange().remove(ClientCallback.class);
+            if (callback != null) {
                 message.getExchange().setInMessage(message);
                 Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message
                                                                 .getExchange()

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java b/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java
index 0c879c5..f139da9 100644
--- a/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java
+++ b/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java
@@ -50,7 +50,8 @@ public class ClientOutFaultObserver extends AbstractFaultChainInitiatorObserver
             return;
         }
         Exception ex = m.getContent(Exception.class);
-        ClientCallback callback = m.getExchange().get(ClientCallback.class);
+        // remove callback so that it won't be invoked twice
+        ClientCallback callback = m.getExchange().remove(ClientCallback.class);
 
         if (callback != null) {
             Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>) m.getExchange().getOutMessage().get(

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java b/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java
index 496b97b..094e432 100644
--- a/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java
+++ b/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java
@@ -157,6 +157,9 @@ public abstract class AbstractWrappedMessage implements Message {
     public <T> void put(Class<T> key, T value) {
         message.put(key, value);
     }
+    public <T> T remove(Class<T> key) {
+        return message.remove(key);
+    }
 
     public Object getContextualProperty(String key) {
         return message.getContextualProperty(key);

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java b/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java
index d2abd7a..4798827 100644
--- a/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java
+++ b/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java
@@ -159,6 +159,10 @@ public class ExchangeImpl extends ConcurrentHashMap<String, Object>
 implements
         return super.put(key, value);
     }
 
+    public <T> T remove(Class<T> key) {
+        return key.cast(super.remove(key.getName()));
+    }
+
     private void setMessageContextProperty(Message m, String key, Object value) {
         if (m == null) {
             return;

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/StringMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/message/StringMap.java b/core/src/main/java/org/apache/cxf/message/StringMap.java
index 4d802b8..87115fe 100644
--- a/core/src/main/java/org/apache/cxf/message/StringMap.java
+++ b/core/src/main/java/org/apache/cxf/message/StringMap.java
@@ -38,4 +38,11 @@ public interface StringMap extends Map<String, Object> {
      * @param value the value
      */
     <T> void put(Class<T> key, T value);
+
+    /**
+     * Convenience method for removing typed objects from the map.
+     * equivalent to:  (T)remove(key.getName());
+     * @param key the key
+     */
+    <T> T remove(Class<T> key);
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/core/src/main/java/org/apache/cxf/message/StringMapImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/message/StringMapImpl.java b/core/src/main/java/org/apache/cxf/message/StringMapImpl.java
index c1428e4..f3acdc1 100644
--- a/core/src/main/java/org/apache/cxf/message/StringMapImpl.java
+++ b/core/src/main/java/org/apache/cxf/message/StringMapImpl.java
@@ -46,4 +46,8 @@ public class StringMapImpl
     public <T> void put(Class<T> key, T value) {
         put(key.getName(), value);
     }
+
+    public <T> T remove(Class<T> key) {
+        return key.cast(remove(key.getName()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
index 3341e3e..d4a25f6 100644
--- a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
+++ b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
@@ -23,6 +23,7 @@ import java.net.URL;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
@@ -197,6 +198,26 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase
{
             }
         }).get();
     }
+
+    @Test
+    public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception {
+        // This test is especially targeted for RHEL 6.8
+        updateAddressPort(g, PORT_INV);
+        int repeat = 100;
+        final AtomicInteger count = new AtomicInteger(0);
+        for (int i = 0; i < repeat; i++) {
+            try {
+                g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() {
+                    public void handleResponse(Response<GreetMeResponse> res) {
+                        count.incrementAndGet();
+                    }
+                }).get();
+            } catch (Exception e) {
+            }
+        }
+        Thread.sleep(1000);
+        assertEquals("Callback should be invoked only once per request", repeat, count.intValue());
+    }
         
     @Test
     @Ignore("peformance test")

http://git-wip-us.apache.org/repos/asf/cxf/blob/2e2bfe12/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
index adebe09..e5aa543 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
@@ -1626,12 +1626,13 @@ public abstract class HTTPConduit
                     if (isOneway(exchange) && responseCode > 300) {
                         throw new HTTPException(responseCode, getResponseMessage(), url.toURL());
                     }
-                    ClientCallback cc = exchange.get(ClientCallback.class);
-                    if (null != cc) {
-                        //REVISIT move the decoupled destination property name into api
-                        Endpoint ep = exchange.getEndpoint();
-                        if (null != ep && null != ep.getEndpointInfo() &&
null == ep.getEndpointInfo().
+                    //REVISIT move the decoupled destination property name into api
+                    Endpoint ep = exchange.getEndpoint();
+                    if (null != ep && null != ep.getEndpointInfo() && null
== ep.getEndpointInfo().
                             getProperty("org.apache.cxf.ws.addressing.MAPAggregator.decoupledDestination"))
{
+                        // remove callback so that it won't be invoked twice
+                        ClientCallback cc = exchange.remove(ClientCallback.class);
+                        if (null != cc) {
                             cc.handleResponse(null, null);
                         }
                     }


Mime
View raw message