cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject cxf git commit: Updating to m03 jaxrs 2.1
Date Wed, 01 Feb 2017 12:33:36 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 3bebf6b49 -> ea8b6e270


Updating to m03 jaxrs 2.1


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/ea8b6e27
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/ea8b6e27
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/ea8b6e27

Branch: refs/heads/master
Commit: ea8b6e270c3a4a62061c4269174b6326a57ea2b5
Parents: 3bebf6b
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Wed Feb 1 12:33:14 2017 +0000
Committer: Sergey Beryozkin <sberyozkin@gmail.com>
Committed: Wed Feb 1 12:33:14 2017 +0000

----------------------------------------------------------------------
 parent/pom.xml                                  |   2 +-
 .../apache/cxf/jaxrs/client/AsyncClient.java    |   2 +-
 .../cxf/jaxrs/client/AsyncInvokerImpl.java      | 218 +++++++++
 .../cxf/jaxrs/client/ClientProviderFactory.java |  12 +-
 .../client/CompletionStageRxInvokerImpl.java    | 163 +++++++
 .../cxf/jaxrs/client/SyncInvokerImpl.java       | 161 +++++++
 .../org/apache/cxf/jaxrs/client/WebClient.java  | 473 +------------------
 .../client/spec/InvocationBuilderImpl.java      |  33 +-
 .../rx/client/ObservableRxInvokerProvider.java  |  16 +-
 .../jaxrs/reactive/JAXRSReactiveTest.java       |  11 +-
 10 files changed, 600 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 3427503..e399855 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -109,7 +109,7 @@
         <cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version>
         <cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
         <cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
-        <cxf.javax.ws.rs.version>2.1-m02</cxf.javax.ws.rs.version>
+        <cxf.javax.ws.rs.version>2.1-m03</cxf.javax.ws.rs.version>
         <cxf.jaxb.version>2.2.11</cxf.jaxb.version>
         <cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
         <cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
index e81a090..0766718 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
@@ -20,7 +20,7 @@ package org.apache.cxf.jaxrs.client;
 
 import java.lang.reflect.Type;
 
-//Work in progress
+//Work in progress. May be removed once the Rx client work is finalized
 public interface AsyncClient {
     void prepareAsyncClient(String httpMethod, 
                             Object body,

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
new file mode 100644
index 0000000..c51a969
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import java.util.concurrent.Future;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.AsyncInvoker;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+public class AsyncInvokerImpl implements AsyncInvoker {
+
+    private WebClient wc;
+    
+    public AsyncInvokerImpl(WebClient wc) {
+        this.wc = wc;
+    }
+    @Override
+    public Future<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> Future<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Future<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Future<T> get(InvocationCallback<T> callback) {
+        return method(HttpMethod.GET, callback);
+    }
+
+    @Override
+    public Future<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> Future<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Future<T> put(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Future<T> put(Entity<?> entity, InvocationCallback<T> callback) {
+        return method(HttpMethod.PUT, entity, callback);
+    }
+
+    @Override
+    public Future<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> Future<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Future<T> post(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Future<T> post(Entity<?> entity, InvocationCallback<T> callback) {
+        return method(HttpMethod.POST, entity, callback);
+    }
+
+    @Override
+    public Future<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> Future<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Future<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Future<T> delete(InvocationCallback<T> callback) {
+        return method(HttpMethod.DELETE, callback);
+    }
+
+    @Override
+    public Future<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Future<Response> head(InvocationCallback<Response> callback) {
+        return method(HttpMethod.HEAD, callback);
+    }
+
+    @Override
+    public Future<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> Future<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Future<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Future<T> options(InvocationCallback<T> callback) {
+        return method(HttpMethod.OPTIONS, callback);
+    }
+
+    @Override
+    public Future<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> Future<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Future<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Future<T> trace(InvocationCallback<T> callback) {
+        return method("TRACE", callback);
+    }
+
+    @Override
+    public Future<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public <T> Future<T> method(String name, Class<T> responseType) {
+        return wc.doInvokeAsync(name, null, null, null, responseType, responseType, null);
+    }
+
+    @Override
+    public <T> Future<T> method(String name, GenericType<T> responseType) {
+        return wc.doInvokeAsync(name, null, null, null, responseType.getRawType(),
+                             responseType.getType(), null);
+    }
+
+    @Override
+    public <T> Future<T> method(String name, InvocationCallback<T> callback) {
+        return wc.doInvokeAsyncCallback(name, null, null, null, callback);
+    }
+
+    @Override
+    public Future<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> Future<T> method(String name, Entity<?> entity, Class<T> responseType) {
+        return wc.doInvokeAsync(name, 
+                             entity, 
+                             null, 
+                             null, responseType, responseType, null);
+    }
+
+    @Override
+    public <T> Future<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+        return wc.doInvokeAsync(name, 
+                             entity, 
+                             null, 
+                             null, responseType.getRawType(), responseType.getType(), null);
+    }
+
+    @Override
+    public <T> Future<T> method(String name, Entity<?> entity, InvocationCallback<T> callback) {
+        return wc.doInvokeAsyncCallback(name, 
+                                     entity, 
+                                     null,
+                                     null, 
+                                     callback);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
index dd2522e..f064e5d 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import javax.ws.rs.client.ClientRequestFilter;
 import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.RxInvokerProvider;
 import javax.ws.rs.core.Configuration;
 
 import org.apache.cxf.Bus;
@@ -42,7 +43,7 @@ public final class ClientProviderFactory extends ProviderFactory {
         new ArrayList<ProviderInfo<ClientResponseFilter>>(1);
     private List<ProviderInfo<ResponseExceptionMapper<?>>> responseExceptionMappers = 
         new ArrayList<ProviderInfo<ResponseExceptionMapper<?>>>(1);
-    
+    private RxInvokerProvider<?> rxInvokerProvider;
     private ClientProviderFactory(Bus bus) {
         super(bus);
     }
@@ -85,6 +86,10 @@ public final class ClientProviderFactory extends ProviderFactory {
             if (ResponseExceptionMapper.class.isAssignableFrom(providerCls)) {
                 addProviderToList(responseExceptionMappers, provider);
             }
+            
+            if (RxInvokerProvider.class.isAssignableFrom(providerCls)) {
+                this.rxInvokerProvider = RxInvokerProvider.class.cast(provider.getProvider());
+            }
         }
         Collections.sort(clientRequestFilters, 
                          new BindingPriorityComparator(ClientRequestFilter.class, true));
@@ -133,5 +138,8 @@ public final class ClientProviderFactory extends ProviderFactory {
         return (Configuration)m.getExchange().getOutMessage()
             .getContextualProperty(Configuration.class.getName());
     }
-    
+
+    public RxInvokerProvider<?> getRxInvokerProvider() {
+        return rxInvokerProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
new file mode 100644
index 0000000..6071511
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.CompletionStageRxInvoker;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+public class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
+    private WebClient wc;
+    private ExecutorService ex;
+    CompletionStageRxInvokerImpl(WebClient wc, ExecutorService ex) {
+        this.ex = ex;
+        this.wc = wc;
+    }
+    
+    @Override
+    public CompletionStage<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public CompletionStage<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> put(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public CompletionStage<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> post(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public CompletionStage<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public CompletionStage<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public CompletionStage<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public CompletionStage<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> CompletionStage<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public CompletionStage<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public CompletionStage<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) {
+        return wc.doInvokeAsyncStage(name, entity, responseType, responseType, ex);
+    }
+
+    @Override
+    public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+        return wc.doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex);
+    }
+
+    @Override
+    public <T> CompletionStage<T> method(String name, Class<T> responseType) {
+        return wc.doInvokeAsyncStage(name, null, responseType, responseType, ex);
+    }
+
+    @Override
+    public <T> CompletionStage<T> method(String name, GenericType<T> responseType) {
+        return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
new file mode 100644
index 0000000..e3133cd
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+public class SyncInvokerImpl implements SyncInvoker {
+    private WebClient wc;
+    public SyncInvokerImpl(WebClient wc) {
+        this.wc = wc;
+    }
+    
+    @Override
+    public Response delete() {
+        return method(HttpMethod.DELETE);
+    }
+
+    @Override
+    public <T> T delete(Class<T> cls) {
+        return method(HttpMethod.DELETE, cls);
+    }
+
+    @Override
+    public <T> T delete(GenericType<T> genericType) {
+        return method(HttpMethod.DELETE, genericType);
+    }
+
+    @Override
+    public Response get() {
+        return method(HttpMethod.GET);
+    }
+
+    @Override
+    public <T> T get(Class<T> cls) {
+        return method(HttpMethod.GET, cls);
+    }
+
+    @Override
+    public <T> T get(GenericType<T> genericType) {
+        return method(HttpMethod.GET, genericType);
+    }
+
+    @Override
+    public Response head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Response options() {
+        return method(HttpMethod.OPTIONS);
+    }
+
+    @Override
+    public <T> T options(Class<T> cls) {
+        return method(HttpMethod.OPTIONS, cls);
+    }
+
+    @Override
+    public <T> T options(GenericType<T> genericType) {
+        return method(HttpMethod.OPTIONS, genericType);
+    }
+
+    @Override
+    public Response post(Entity<?> entity) {
+        return method(HttpMethod.POST, entity);
+    }
+
+    @Override
+    public <T> T post(Entity<?> entity, Class<T> cls) {
+        return method(HttpMethod.POST, entity, cls);
+    }
+
+    @Override
+    public <T> T post(Entity<?> entity, GenericType<T> genericType) {
+        return method(HttpMethod.POST, entity, genericType);
+    }
+
+    @Override
+    public Response put(Entity<?> entity) {
+        return method(HttpMethod.PUT, entity);
+    }
+
+    @Override
+    public <T> T put(Entity<?> entity, Class<T> cls) {
+        return method(HttpMethod.PUT, entity, cls);
+    }
+
+    @Override
+    public <T> T put(Entity<?> entity, GenericType<T> genericType) {
+        return method(HttpMethod.PUT, entity, genericType);
+    }
+
+    @Override
+    public Response trace() {
+        return method("TRACE");
+    }
+
+    @Override
+    public <T> T trace(Class<T> cls) {
+        return method("TRACE", cls);
+    }
+
+    @Override
+    public <T> T trace(GenericType<T> genericType) {
+        return method("TRACE", genericType);
+    }
+    
+    @Override
+    public Response method(String method) {
+        return method(method, Response.class);
+    }
+
+    @Override
+    public <T> T method(String method, Class<T> cls) {
+        return wc.invoke(method, null, cls);
+    }
+
+    @Override
+    public <T> T method(String method, GenericType<T> genericType) {
+        return wc.invoke(method, null, genericType);
+    }
+
+    @Override
+    public Response method(String method, Entity<?> entity) {
+        return method(method, entity, Response.class);
+    }
+
+    @Override
+    public <T> T method(String method, Entity<?> entity, Class<T> cls) {
+        return wc.invoke(method, entity, cls);
+    }
+
+    @Override
+    public <T> T method(String method, Entity<?> entity, GenericType<T> genericType) {
+        return wc.invoke(method, entity, genericType);
+    }
+    
+    public WebClient getWebClient() {
+        return wc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
index f7ed1f2..23349f3 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
@@ -1260,12 +1260,12 @@ public class WebClient extends AbstractClient implements AsyncClient {
     
     // Link to JAX-RS 2.0 AsyncInvoker
     public AsyncInvoker async() {
-        return new AsyncInvokerImpl();
+        return new AsyncInvokerImpl(this);
     }
     
     // Link to JAX-RS 2.0 SyncInvoker
     public SyncInvoker sync() {
-        return new SyncInvokerImpl();
+        return new SyncInvokerImpl(this);
     }
     
     // Link to JAX-RS 2.1 CompletionStageRxInvoker
@@ -1273,17 +1273,28 @@ public class WebClient extends AbstractClient implements AsyncClient {
         return rx((ExecutorService)null);
     }
     public CompletionStageRxInvoker rx(ExecutorService ex) {
-        return new CompletionStageRxInvokerImpl(ex);
+        return new CompletionStageRxInvokerImpl(this, ex);
     }
     // Link to JAX-RS 2.1 RxInvoker extensions
     @SuppressWarnings("rawtypes")
-    public <T extends RxInvoker> T rx(RxInvokerProvider<T> p) {
-        return rx(p, (ExecutorService)null);
+    public <T extends RxInvoker> T rx(Class<T> rxCls) {
+        return rx(rxCls, (ExecutorService)null);
     }
 
-    @SuppressWarnings("rawtypes")
-    public <T extends RxInvoker> T rx(RxInvokerProvider<T> p, ExecutorService execService) {
-        return p.getRxInvoker(new InvocationBuilderImpl(this), execService);
+    @SuppressWarnings({
+     "rawtypes", "unchecked"
+    })
+    public <T extends RxInvoker> T rx(Class<T> rxCls, ExecutorService executorService) {
+        if (CompletionStageRxInvoker.class.isAssignableFrom(rxCls)) {
+            return (T)rx(executorService);
+        }
+        ClientProviderFactory pf = 
+            ClientProviderFactory.getInstance(WebClient.getConfig(this).getEndpoint());
+        RxInvokerProvider rxProvider = pf.getRxInvokerProvider();
+        if (rxProvider != null && rxProvider.isProviderFor(rxCls)) {
+            return (T)rxProvider.getRxInvoker(sync(), executorService);
+        }
+        throw new IllegalStateException("Provider for " + rxCls.getName() + " is not available");
     }
     
     private void setEntityHeaders(Entity<?> entity) {
@@ -1296,450 +1307,4 @@ public class WebClient extends AbstractClient implements AsyncClient {
         }
     }
     
-    class AsyncInvokerImpl implements AsyncInvoker {
-
-        @Override
-        public Future<Response> get() {
-            return get(Response.class);
-        }
-
-        @Override
-        public <T> Future<T> get(Class<T> responseType) {
-            return method(HttpMethod.GET, responseType);
-        }
-
-        @Override
-        public <T> Future<T> get(GenericType<T> responseType) {
-            return method(HttpMethod.GET, responseType);
-        }
-
-        @Override
-        public <T> Future<T> get(InvocationCallback<T> callback) {
-            return method(HttpMethod.GET, callback);
-        }
-
-        @Override
-        public Future<Response> put(Entity<?> entity) {
-            return put(entity, Response.class);
-        }
-
-        @Override
-        public <T> Future<T> put(Entity<?> entity, Class<T> responseType) {
-            return method(HttpMethod.PUT, entity, responseType);
-        }
-
-        @Override
-        public <T> Future<T> put(Entity<?> entity, GenericType<T> responseType) {
-            return method(HttpMethod.PUT, entity, responseType);
-        }
-
-        @Override
-        public <T> Future<T> put(Entity<?> entity, InvocationCallback<T> callback) {
-            return method(HttpMethod.PUT, entity, callback);
-        }
-
-        @Override
-        public Future<Response> post(Entity<?> entity) {
-            return post(entity, Response.class);
-        }
-
-        @Override
-        public <T> Future<T> post(Entity<?> entity, Class<T> responseType) {
-            return method(HttpMethod.POST, entity, responseType);
-        }
-
-        @Override
-        public <T> Future<T> post(Entity<?> entity, GenericType<T> responseType) {
-            return method(HttpMethod.POST, entity, responseType);
-        }
-
-        @Override
-        public <T> Future<T> post(Entity<?> entity, InvocationCallback<T> callback) {
-            return method(HttpMethod.POST, entity, callback);
-        }
-
-        @Override
-        public Future<Response> delete() {
-            return delete(Response.class);
-        }
-
-        @Override
-        public <T> Future<T> delete(Class<T> responseType) {
-            return method(HttpMethod.DELETE, responseType);
-        }
-
-        @Override
-        public <T> Future<T> delete(GenericType<T> responseType) {
-            return method(HttpMethod.DELETE, responseType);
-        }
-
-        @Override
-        public <T> Future<T> delete(InvocationCallback<T> callback) {
-            return method(HttpMethod.DELETE, callback);
-        }
-
-        @Override
-        public Future<Response> head() {
-            return method(HttpMethod.HEAD);
-        }
-
-        @Override
-        public Future<Response> head(InvocationCallback<Response> callback) {
-            return method(HttpMethod.HEAD, callback);
-        }
-
-        @Override
-        public Future<Response> options() {
-            return options(Response.class);
-        }
-
-        @Override
-        public <T> Future<T> options(Class<T> responseType) {
-            return method(HttpMethod.OPTIONS, responseType);
-        }
-
-        @Override
-        public <T> Future<T> options(GenericType<T> responseType) {
-            return method(HttpMethod.OPTIONS, responseType);
-        }
-
-        @Override
-        public <T> Future<T> options(InvocationCallback<T> callback) {
-            return method(HttpMethod.OPTIONS, callback);
-        }
-
-        @Override
-        public Future<Response> trace() {
-            return trace(Response.class);
-        }
-
-        @Override
-        public <T> Future<T> trace(Class<T> responseType) {
-            return method("TRACE", responseType);
-        }
-
-        @Override
-        public <T> Future<T> trace(GenericType<T> responseType) {
-            return method("TRACE", responseType);
-        }
-
-        @Override
-        public <T> Future<T> trace(InvocationCallback<T> callback) {
-            return method("TRACE", callback);
-        }
-
-        @Override
-        public Future<Response> method(String name) {
-            return method(name, Response.class);
-        }
-
-        @Override
-        public <T> Future<T> method(String name, Class<T> responseType) {
-            return doInvokeAsync(name, null, null, null, responseType, responseType, null);
-        }
-
-        @Override
-        public <T> Future<T> method(String name, GenericType<T> responseType) {
-            return doInvokeAsync(name, null, null, null, responseType.getRawType(),
-                                 responseType.getType(), null);
-        }
-
-        @Override
-        public <T> Future<T> method(String name, InvocationCallback<T> callback) {
-            return doInvokeAsyncCallback(name, null, null, null, callback);
-        }
-
-        @Override
-        public Future<Response> method(String name, Entity<?> entity) {
-            return method(name, entity, Response.class);
-        }
-
-        @Override
-        public <T> Future<T> method(String name, Entity<?> entity, Class<T> responseType) {
-            return doInvokeAsync(name, 
-                                 entity, 
-                                 null, 
-                                 null, responseType, responseType, null);
-        }
-
-        @Override
-        public <T> Future<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
-            return doInvokeAsync(name, 
-                                 entity, 
-                                 null, 
-                                 null, responseType.getRawType(), responseType.getType(), null);
-        }
-
-        @Override
-        public <T> Future<T> method(String name, Entity<?> entity, InvocationCallback<T> callback) {
-            return doInvokeAsyncCallback(name, 
-                                         entity, 
-                                         null,
-                                         null, 
-                                         callback);
-        }
-    }
-    
-    
-    
-    class SyncInvokerImpl implements SyncInvoker {
-
-        @Override
-        public Response delete() {
-            return method(HttpMethod.DELETE);
-        }
-
-        @Override
-        public <T> T delete(Class<T> cls) {
-            return method(HttpMethod.DELETE, cls);
-        }
-
-        @Override
-        public <T> T delete(GenericType<T> genericType) {
-            return method(HttpMethod.DELETE, genericType);
-        }
-
-        @Override
-        public Response get() {
-            return method(HttpMethod.GET);
-        }
-
-        @Override
-        public <T> T get(Class<T> cls) {
-            return method(HttpMethod.GET, cls);
-        }
-
-        @Override
-        public <T> T get(GenericType<T> genericType) {
-            return method(HttpMethod.GET, genericType);
-        }
-
-        @Override
-        public Response head() {
-            return method(HttpMethod.HEAD);
-        }
-
-        @Override
-        public Response options() {
-            return method(HttpMethod.OPTIONS);
-        }
-
-        @Override
-        public <T> T options(Class<T> cls) {
-            return method(HttpMethod.OPTIONS, cls);
-        }
-
-        @Override
-        public <T> T options(GenericType<T> genericType) {
-            return method(HttpMethod.OPTIONS, genericType);
-        }
-
-        @Override
-        public Response post(Entity<?> entity) {
-            return method(HttpMethod.POST, entity);
-        }
-
-        @Override
-        public <T> T post(Entity<?> entity, Class<T> cls) {
-            return method(HttpMethod.POST, entity, cls);
-        }
-
-        @Override
-        public <T> T post(Entity<?> entity, GenericType<T> genericType) {
-            return method(HttpMethod.POST, entity, genericType);
-        }
-
-        @Override
-        public Response put(Entity<?> entity) {
-            return method(HttpMethod.PUT, entity);
-        }
-
-        @Override
-        public <T> T put(Entity<?> entity, Class<T> cls) {
-            return method(HttpMethod.PUT, entity, cls);
-        }
-
-        @Override
-        public <T> T put(Entity<?> entity, GenericType<T> genericType) {
-            return method(HttpMethod.PUT, entity, genericType);
-        }
-
-        @Override
-        public Response trace() {
-            return method("TRACE");
-        }
-
-        @Override
-        public <T> T trace(Class<T> cls) {
-            return method("TRACE", cls);
-        }
-
-        @Override
-        public <T> T trace(GenericType<T> genericType) {
-            return method("TRACE", genericType);
-        }
-        
-        @Override
-        public Response method(String method) {
-            return method(method, Response.class);
-        }
-
-        @Override
-        public <T> T method(String method, Class<T> cls) {
-            return invoke(method, null, cls);
-        }
-
-        @Override
-        public <T> T method(String method, GenericType<T> genericType) {
-            return invoke(method, null, genericType);
-        }
-
-        @Override
-        public Response method(String method, Entity<?> entity) {
-            return method(method, entity, Response.class);
-        }
-
-        @Override
-        public <T> T method(String method, Entity<?> entity, Class<T> cls) {
-            return invoke(method, entity, cls);
-        }
-
-        @Override
-        public <T> T method(String method, Entity<?> entity, GenericType<T> genericType) {
-            return invoke(method, entity, genericType);
-        }
-    }
-    
-    class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
-        private ExecutorService ex;
-        CompletionStageRxInvokerImpl(ExecutorService ex) {
-            this.ex = ex;
-        }
-        
-        @Override
-        public CompletionStage<Response> get() {
-            return get(Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> get(Class<T> responseType) {
-            return method(HttpMethod.GET, responseType);
-        }
-
-        @Override
-        public <T> CompletionStage<T> get(GenericType<T> responseType) {
-            return method(HttpMethod.GET, responseType);
-        }
-
-        @Override
-        public CompletionStage<Response> put(Entity<?> entity) {
-            return put(entity, Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> put(Entity<?> entity, Class<T> responseType) {
-            return method(HttpMethod.PUT, entity, responseType);
-        }
-
-        @Override
-        public <T> CompletionStage<T> put(Entity<?> entity, GenericType<T> responseType) {
-            return method(HttpMethod.PUT, entity, responseType);
-        }
-
-        @Override
-        public CompletionStage<Response> post(Entity<?> entity) {
-            return post(entity, Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> post(Entity<?> entity, Class<T> responseType) {
-            return method(HttpMethod.POST, entity, responseType);
-        }
-
-        @Override
-        public <T> CompletionStage<T> post(Entity<?> entity, GenericType<T> responseType) {
-            return method(HttpMethod.POST, entity, responseType);
-        }
-
-        @Override
-        public CompletionStage<Response> delete() {
-            return delete(Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> delete(Class<T> responseType) {
-            return method(HttpMethod.DELETE, responseType);
-        }
-
-        @Override
-        public <T> CompletionStage<T> delete(GenericType<T> responseType) {
-            return method(HttpMethod.DELETE, responseType);
-        }
-
-        @Override
-        public CompletionStage<Response> head() {
-            return method(HttpMethod.HEAD);
-        }
-
-        @Override
-        public CompletionStage<Response> options() {
-            return options(Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> options(Class<T> responseType) {
-            return method(HttpMethod.OPTIONS, responseType);
-        }
-
-        @Override
-        public <T> CompletionStage<T> options(GenericType<T> responseType) {
-            return method(HttpMethod.OPTIONS, responseType);
-        }
-
-        @Override
-        public CompletionStage<Response> trace() {
-            return trace(Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> trace(Class<T> responseType) {
-            return method("TRACE", responseType);
-        }
-
-        @Override
-        public <T> CompletionStage<T> trace(GenericType<T> responseType) {
-            return method("TRACE", responseType);
-        }
-
-        @Override
-        public CompletionStage<Response> method(String name) {
-            return method(name, Response.class);
-        }
-
-        @Override
-        public CompletionStage<Response> method(String name, Entity<?> entity) {
-            return method(name, entity, Response.class);
-        }
-
-        @Override
-        public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) {
-            return doInvokeAsyncStage(name, entity, responseType, responseType, ex);
-        }
-
-        @Override
-        public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
-            return doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex);
-        }
-
-        @Override
-        public <T> CompletionStage<T> method(String name, Class<T> responseType) {
-            return doInvokeAsyncStage(name, null, responseType, responseType, ex);
-        }
-
-        @Override
-        public <T> CompletionStage<T> method(String name, GenericType<T> responseType) {
-            return doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex);
-        }
-             
-    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index ea84b46..b8c870a 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import javax.ws.rs.HttpMethod;
-import javax.ws.rs.ProcessingException;
 import javax.ws.rs.client.AsyncInvoker;
 import javax.ws.rs.client.CompletionStageRxInvoker;
 import javax.ws.rs.client.Entity;
@@ -35,7 +34,6 @@ import javax.ws.rs.client.Invocation.Builder;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.NioInvoker;
 import javax.ws.rs.client.RxInvoker;
-import javax.ws.rs.client.RxInvokerProvider;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.CacheControl;
 import javax.ws.rs.core.Cookie;
@@ -384,40 +382,25 @@ public class InvocationBuilderImpl implements Invocation.Builder {
 
     @Override
     public CompletionStageRxInvoker rx(ExecutorService executorService) {
+        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+        // Investigate if letting the CompletableFuture thread pool deal with the sync invocation
+        // is indeed more effective
+        
         return webClient.rx(executorService);
     }
 
     
     @SuppressWarnings("rawtypes")
     @Override
-    public <T extends RxInvoker> T rx(Class<? extends RxInvokerProvider<T>> pClass) {
-        return rx(pClass, (ExecutorService)null);
+    public <T extends RxInvoker> T rx(Class<T> rxCls) {
+        return rx(rxCls, (ExecutorService)null);
     }
 
     @SuppressWarnings("rawtypes")
     @Override
-    public <T extends RxInvoker> T rx(Class<? extends RxInvokerProvider<T>> pClass, ExecutorService execService) {
-        RxInvokerProvider<T> p = null;
-        try {
-            p = pClass.newInstance();
-        } catch (Throwable t) {
-            throw new ProcessingException(t);
-        }
-        return rx(p, execService);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    @Override
-    public <T extends RxInvoker> T rx(RxInvokerProvider<T> p) {
-        return rx(p, (ExecutorService)null);
+    public <T extends RxInvoker> T rx(Class<T> rxCls, ExecutorService executorService) {
+        return webClient.rx(rxCls, executorService);
     }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public <T extends RxInvoker> T rx(RxInvokerProvider<T> p, ExecutorService execService) {
-        return p.getRxInvoker(this, execService);
-    }
-    
     
     @Override
     public NioInvoker nio() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
index 100fca2..63f60a6 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
@@ -20,16 +20,24 @@ package org.apache.cxf.jaxrs.rx.client;
 
 import java.util.concurrent.ExecutorService;
 
-import javax.ws.rs.client.Invocation;
 import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
 
-import org.apache.cxf.jaxrs.client.spec.InvocationBuilderImpl;
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
 
 public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> {
 
     @Override
-    public ObservableRxInvoker getRxInvoker(Invocation.Builder builder, ExecutorService execService) {
-        return new ObservableRxInvokerImpl(((InvocationBuilderImpl)builder).getWebClient(), execService);
+    public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+        // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+        // Investigate if letting the RxJava thread pool deal with the sync invocation
+        // is indeed more effective 
+        return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+    }
+
+    @Override
+    public boolean isProviderFor(Class<?> rxCls) {
+        return ObservableRxInvoker.class == rxCls;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index ec25b17..0508d67 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
 import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
 import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -127,9 +128,10 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase {
     @Test
     public void testGetHelloWorldAsyncObservable() throws Exception {
         String address = "http://localhost:" + PORT + "/reactive/textAsync";
-        WebClient wc = WebClient.create(address);
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new ObservableRxInvokerProvider()));
         Observable<String> obs = wc.accept("text/plain")
-            .rx(new ObservableRxInvokerProvider())
+            .rx(ObservableRxInvoker.class)
             .get(String.class);
         obs.map(s -> { 
             return s + s; 
@@ -142,8 +144,9 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase {
     @Test
     public void testGetHelloWorldAsyncObservable404() throws Exception {
         String address = "http://localhost:" + PORT + "/reactive/textAsync404";
-        Invocation.Builder b = ClientBuilder.newClient().target(address).request();
-        b.rx(ObservableRxInvokerProvider.class).get(String.class).subscribe(
+        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
+            .target(address).request();
+        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
             s -> {
                 fail("Exception expected");
             },


Mime
View raw message