Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE70B2004F5 for ; Fri, 1 Sep 2017 12:01:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ACCAC16CC1A; Fri, 1 Sep 2017 10:01:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CB1DB16CC17 for ; Fri, 1 Sep 2017 12:01:12 +0200 (CEST) Received: (qmail 32519 invoked by uid 500); 1 Sep 2017 10:01:12 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 32510 invoked by uid 99); 1 Sep 2017 10:01:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Sep 2017 10:01:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91282F3292; Fri, 1 Sep 2017 10:01:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sergeyb@apache.org To: commits@cxf.apache.org Message-Id: <4c15603713244b70827bf7c64b1e9e5b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: cxf git commit: Having a single invoker only for RxJava2 Date: Fri, 1 Sep 2017 10:01:08 +0000 (UTC) archived-at: Fri, 01 Sep 2017 10:01:13 -0000 Repository: cxf Updated Branches: refs/heads/master 0f3e34689 -> 18dd0e1c7 Having a single invoker only for RxJava2 Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/18dd0e1c Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/18dd0e1c Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/18dd0e1c Branch: refs/heads/master Commit: 18dd0e1c754fd30c9ab479cf123db5ab8f6810e7 Parents: 0f3e346 Author: Sergey Beryozkin Authored: Fri Sep 1 11:00:53 2017 +0100 Committer: Sergey Beryozkin Committed: Fri Sep 1 11:00:53 2017 +0100 ---------------------------------------------------------------------- .../cxf/jaxrs/rx2/server/FlowableInvoker.java | 43 --------------- .../cxf/jaxrs/rx2/server/ObservableInvoker.java | 43 --------------- .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 57 ++++++++++++++++++++ .../jaxrs/reactive/RxJava2FlowableServer.java | 4 +- .../jaxrs/reactive/RxJava2ObservableServer.java | 4 +- 5 files changed, 61 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java deleted file mode 100644 index 1ff7491..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx2.server; - -import org.apache.cxf.jaxrs.JAXRSInvoker; -import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; -import org.apache.cxf.message.Message; - -import io.reactivex.Flowable; - -public class FlowableInvoker extends JAXRSInvoker { - protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { - if (result instanceof Flowable) { - final Flowable f = (Flowable)result; - final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); - return asyncResponse; - } - return null; - } - - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - //TODO: if it is a Cancelation exception => asyncResponse.cancel(); - asyncResponse.resume(t); - return null; - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java deleted file mode 100644 index 8047c6a..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx2.server; - -import org.apache.cxf.jaxrs.JAXRSInvoker; -import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; -import org.apache.cxf.message.Message; - -import io.reactivex.Observable; - -public class ObservableInvoker extends JAXRSInvoker { - protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { - if (result instanceof Observable) { - final Observable obs = (Observable)result; - final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); - return asyncResponse; - } - return null; - } - - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - //TODO: if it is a Cancelation exception => asyncResponse.cancel(); - asyncResponse.resume(t); - return null; - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java new file mode 100644 index 0000000..c529d4a --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.server; + +import org.apache.cxf.jaxrs.JAXRSInvoker; +import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; +import org.apache.cxf.message.Message; + +import io.reactivex.Flowable; +import io.reactivex.Observable; + +//Work in Progress +public class ReactiveIOInvoker extends JAXRSInvoker { + protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { + if (result instanceof Flowable) { + return handleFlowable(inMessage, (Flowable)result); + } else if (result instanceof Observable) { + return handleObservable(inMessage, (Observable)result); + } else { + return null; + } + } + + protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable f) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + return asyncResponse; + } + + protected AsyncResponseImpl handleObservable(Message inMessage, Observable obs) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + return asyncResponse; + } + + private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { + //TODO: if it is a Cancelation exception => asyncResponse.cancel(); + asyncResponse.resume(t); + return null; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java index fe41958..8558bed 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java @@ -29,7 +29,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.rx2.server.FlowableInvoker; +import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -45,7 +45,7 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new FlowableInvoker()); + sf.setInvoker(new ReactiveIOInvoker()); sf.setProvider(new JacksonJsonProvider()); StreamingResponseProvider streamProvider = new StreamingResponseProvider(); streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java index 48df030..a8849d1 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java @@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory; import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker; +import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -42,7 +42,7 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new ObservableInvoker()); + sf.setInvoker(new ReactiveIOInvoker()); sf.setProvider(new JacksonJsonProvider()); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(RxJava2ObservableService.class);