Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ECBC0200B9C for ; Mon, 5 Sep 2016 03:10:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EB778160AB9; Mon, 5 Sep 2016 01:10:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41358160AD8 for ; Mon, 5 Sep 2016 03:10:11 +0200 (CEST) Received: (qmail 41767 invoked by uid 500); 5 Sep 2016 01:10:09 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 40782 invoked by uid 99); 5 Sep 2016 01:10:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Sep 2016 01:10:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04B7BE5CE1; Mon, 5 Sep 2016 01:10:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: reta@apache.org To: commits@cxf.apache.org Date: Mon, 05 Sep 2016 01:10:19 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/23] cxf git commit: [CXF-6833] Package updates archived-at: Mon, 05 Sep 2016 01:10:13 -0000 [CXF-6833] Package updates Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6fcdc7e9 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6fcdc7e9 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6fcdc7e9 Branch: refs/heads/master-jaxrs-2.1 Commit: 6fcdc7e9eec2414a40bd3161cf237ef182e313a6 Parents: 6d82b75 Author: Sergey Beryozkin Authored: Fri Sep 2 16:27:40 2016 +0100 Committer: Sergey Beryozkin Committed: Fri Sep 2 16:27:40 2016 +0100 ---------------------------------------------------------------------- .../cxf/jaxrs/rx/AbstractAsyncSubscriber.java | 56 --------- .../jaxrs/rx/JsonStreamingAsyncSubscriber.java | 34 ----- .../cxf/jaxrs/rx/ListAsyncSubscriber.java | 42 ------- .../apache/cxf/jaxrs/rx/ObservableReader.java | 61 --------- .../apache/cxf/jaxrs/rx/ObservableWriter.java | 119 ------------------ .../cxf/jaxrs/rx/StreamingAsyncSubscriber.java | 124 ------------------- .../cxf/jaxrs/rx/provider/ObservableReader.java | 61 +++++++++ .../cxf/jaxrs/rx/provider/ObservableWriter.java | 119 ++++++++++++++++++ .../rx/server/AbstractAsyncSubscriber.java | 56 +++++++++ .../rx/server/JsonStreamingAsyncSubscriber.java | 34 +++++ .../jaxrs/rx/server/ListAsyncSubscriber.java | 42 +++++++ .../rx/server/StreamingAsyncSubscriber.java | 124 +++++++++++++++++++ .../cxf/jaxrs/rx/ObservableWriterTest.java | 32 ----- .../jaxrs/rx/provider/ObservableWriterTest.java | 32 +++++ .../jaxrs/reactive/JAXRSReactiveTest.java | 2 +- .../systest/jaxrs/reactive/ReactiveServer.java | 2 +- .../systest/jaxrs/reactive/ReactiveService.java | 6 +- 17 files changed, 473 insertions(+), 473 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java deleted file mode 100644 index 80b1592..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/AbstractAsyncSubscriber.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx; - -import java.util.List; - -import javax.ws.rs.container.AsyncResponse; - -import org.apache.cxf.jaxrs.ext.StreamingResponse; - -import rx.Subscriber; - -public abstract class AbstractAsyncSubscriber extends Subscriber { - - private AsyncResponse ar; - - protected AbstractAsyncSubscriber(AsyncResponse ar) { - this.ar = ar; - } - public void resume(T response) { - ar.resume(response); - } - - public void resume(List response) { - ar.resume(response); - } - - public void resume(StreamingResponse response) { - ar.resume(response); - } - - @Override - public void onError(Throwable t) { - ar.resume(t); - } - - protected AsyncResponse getAsyncResponse() { - return ar; - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java deleted file mode 100644 index b5c22a4..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/JsonStreamingAsyncSubscriber.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx; - -import javax.ws.rs.container.AsyncResponse; - -public class JsonStreamingAsyncSubscriber extends StreamingAsyncSubscriber { - public JsonStreamingAsyncSubscriber(AsyncResponse ar) { - this(ar, 1000); - } - public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) { - this(ar, pollTimeout, 0); - } - public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) { - super(ar, "[", "]", ",", pollTimeout, asyncTimeout); - } - -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java deleted file mode 100644 index e94a861..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ListAsyncSubscriber.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx; - -import java.util.LinkedList; -import java.util.List; - -import javax.ws.rs.container.AsyncResponse; - -public class ListAsyncSubscriber extends AbstractAsyncSubscriber { - - private List beans = new LinkedList(); - public ListAsyncSubscriber(AsyncResponse ar) { - super(ar); - } - @Override - public void onCompleted() { - super.resume(beans); - } - - @Override - public void onNext(T bean) { - beans.add(bean); - } - -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java deleted file mode 100644 index 0e0780a..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableReader.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx; - -import java.io.IOException; -import java.io.InputStream; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; - -import javax.ws.rs.ProcessingException; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyReader; -import javax.ws.rs.ext.Providers; - -import org.apache.cxf.jaxrs.utils.InjectionUtils; - -import rx.Observable; - -public class ObservableReader implements MessageBodyReader> { - - @Context - private Providers providers; - - @Override - public boolean isReadable(Class arg0, Type arg1, Annotation[] arg2, MediaType arg3) { - return true; - } - - @Override - public Observable readFrom(Class> cls, Type t, Annotation[] anns, MediaType mt, - MultivaluedMap headers, InputStream is) - throws IOException, WebApplicationException { - @SuppressWarnings("unchecked") - Class actualCls = (Class)InjectionUtils.getActualType(t); - final MessageBodyReader mbr = - (MessageBodyReader)providers.getMessageBodyReader(actualCls, actualCls, anns, mt); - if (mbr == null) { - throw new ProcessingException("MBR is null"); - } - return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is)); - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java deleted file mode 100644 index 475d36b..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/ObservableWriter.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx; - -import java.io.IOException; -import java.io.OutputStream; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; -import java.util.LinkedList; -import java.util.List; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; -import javax.ws.rs.ext.Provider; -import javax.ws.rs.ext.Providers; - -import org.apache.cxf.jaxrs.utils.ExceptionUtils; -import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType; - -import rx.Observable; - -@Provider -public class ObservableWriter implements MessageBodyWriter> { - - @Context - private Providers providers; - private boolean writeSingleElementAsList; - - @Override - public long getSize(Observable arg0, Class arg1, Type arg2, Annotation[] arg3, MediaType arg4) { - // TODO Auto-generated method stub - return -1; - } - - @Override - public boolean isWriteable(Class arg0, Type arg1, Annotation[] arg2, MediaType arg3) { - return true; - } - - @Override - public void writeTo(Observable obs, Class cls, Type t, Annotation[] anns, MediaType mt, - MultivaluedMap headers, OutputStream os) - throws IOException, WebApplicationException { - List entities = new LinkedList(); - obs.subscribe(value -> entities.add(value), - throwable -> throwError(throwable)); - if (!entities.isEmpty()) { - - if (entities.get(0) instanceof List) { - List allEntities = new LinkedList(); - for (T obj : entities) { - @SuppressWarnings("unchecked") - List listT = (List)obj; - allEntities.addAll(listT); - } - writeToOutputStream(allEntities, anns, mt, headers, os); - } else if (entities.size() > 1 || writeSingleElementAsList) { - writeToOutputStream(entities, anns, mt, headers, os); - } else { - writeToOutputStream(entities.get(0), anns, mt, headers, os); - } - } - } - - private void writeToOutputStream(Object value, - Annotation[] anns, - MediaType mt, - MultivaluedMap headers, - OutputStream os) { - Class valueCls = value.getClass(); - Type valueType = null; - if (value instanceof List) { - List list = (List)value; - valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass()); - } else { - valueType = valueCls; - } - @SuppressWarnings("unchecked") - MessageBodyWriter writer = - (MessageBodyWriter)providers.getMessageBodyWriter(valueCls, valueType, anns, mt); - if (writer == null) { - throwError(null); - } - - try { - writer.writeTo(value, valueCls, valueType, anns, mt, headers, os); - } catch (IOException ex) { - throwError(ex); - } - } - - private static void throwError(Throwable cause) { - throw ExceptionUtils.toInternalServerErrorException(cause, null); - } - - public void setWriteSingleElementAsList(boolean writeSingleElementAsList) { - this.writeSingleElementAsList = writeSingleElementAsList; - } - -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java deleted file mode 100644 index c531e98..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/StreamingAsyncSubscriber.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.TimeoutHandler; - -import org.apache.cxf.common.util.StringUtils; -import org.apache.cxf.jaxrs.ext.StreamingResponse; - -public class StreamingAsyncSubscriber extends AbstractAsyncSubscriber { - - private BlockingQueue queue = new LinkedBlockingQueue(); - private String openTag; - private String closeTag; - private String separator; - private long pollTimeout; - private long asyncTimeout; - private volatile boolean completed; - private AtomicBoolean firstWriteDone = new AtomicBoolean(); - public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) { - this(ar, openTag, closeTag, "", 1000); - } - public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, - long pollTimeout) { - this(ar, openTag, closeTag, sep, pollTimeout, 0); - } - public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, - long pollTimeout, long asyncTimeout) { - super(ar); - this.openTag = openTag; - this.closeTag = closeTag; - this.separator = sep; - this.pollTimeout = pollTimeout; - this.asyncTimeout = 0; - if (asyncTimeout > 0) { - ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); - ar.setTimeoutHandler(new TimeoutHandlerImpl()); - } - } - @Override - public void onStart() { - if (asyncTimeout == 0) { - resumeAsyncResponse(); - } - } - private void resumeAsyncResponse() { - super.resume(new StreamingResponseImpl()); - } - @Override - public void onCompleted() { - completed = true; - } - - @Override - public void onNext(T bean) { - if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) { - resumeAsyncResponse(); - } - queue.add(bean); - } - private class StreamingResponseImpl implements StreamingResponse { - - @Override - public void writeTo(Writer writer) throws IOException { - if (openTag != null) { - writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag)); - } - while (!completed || queue.size() > 0) { - try { - T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); - if (bean != null) { - if (firstWriteDone.getAndSet(true)) { - writer.getEntityStream().write(StringUtils.toBytesUTF8(separator)); - } - writer.write(bean); - } - } catch (InterruptedException ex) { - // ignore - } - } - if (closeTag != null) { - writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag)); - } - - } - - } - public class TimeoutHandlerImpl implements TimeoutHandler { - - @Override - public void handleTimeout(AsyncResponse asyncResponse) { - if (queue.isEmpty()) { - asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); - } else { - resumeAsyncResponse(); - } - - } - - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java new file mode 100644 index 0000000..f05f478 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableReader.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.provider; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Providers; + +import org.apache.cxf.jaxrs.utils.InjectionUtils; + +import rx.Observable; + +public class ObservableReader implements MessageBodyReader> { + + @Context + private Providers providers; + + @Override + public boolean isReadable(Class arg0, Type arg1, Annotation[] arg2, MediaType arg3) { + return true; + } + + @Override + public Observable readFrom(Class> cls, Type t, Annotation[] anns, MediaType mt, + MultivaluedMap headers, InputStream is) + throws IOException, WebApplicationException { + @SuppressWarnings("unchecked") + Class actualCls = (Class)InjectionUtils.getActualType(t); + final MessageBodyReader mbr = + (MessageBodyReader)providers.getMessageBodyReader(actualCls, actualCls, anns, mt); + if (mbr == null) { + throw new ProcessingException("MBR is null"); + } + return Observable.just(mbr.readFrom(actualCls, actualCls, anns, mt, headers, is)); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java new file mode 100644 index 0000000..b4ed2af --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriter.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.provider; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.util.LinkedList; +import java.util.List; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; +import javax.ws.rs.ext.Providers; + +import org.apache.cxf.jaxrs.utils.ExceptionUtils; +import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType; + +import rx.Observable; + +@Provider +public class ObservableWriter implements MessageBodyWriter> { + + @Context + private Providers providers; + private boolean writeSingleElementAsList; + + @Override + public long getSize(Observable arg0, Class arg1, Type arg2, Annotation[] arg3, MediaType arg4) { + // TODO Auto-generated method stub + return -1; + } + + @Override + public boolean isWriteable(Class arg0, Type arg1, Annotation[] arg2, MediaType arg3) { + return true; + } + + @Override + public void writeTo(Observable obs, Class cls, Type t, Annotation[] anns, MediaType mt, + MultivaluedMap headers, OutputStream os) + throws IOException, WebApplicationException { + List entities = new LinkedList(); + obs.subscribe(value -> entities.add(value), + throwable -> throwError(throwable)); + if (!entities.isEmpty()) { + + if (entities.get(0) instanceof List) { + List allEntities = new LinkedList(); + for (T obj : entities) { + @SuppressWarnings("unchecked") + List listT = (List)obj; + allEntities.addAll(listT); + } + writeToOutputStream(allEntities, anns, mt, headers, os); + } else if (entities.size() > 1 || writeSingleElementAsList) { + writeToOutputStream(entities, anns, mt, headers, os); + } else { + writeToOutputStream(entities.get(0), anns, mt, headers, os); + } + } + } + + private void writeToOutputStream(Object value, + Annotation[] anns, + MediaType mt, + MultivaluedMap headers, + OutputStream os) { + Class valueCls = value.getClass(); + Type valueType = null; + if (value instanceof List) { + List list = (List)value; + valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass()); + } else { + valueType = valueCls; + } + @SuppressWarnings("unchecked") + MessageBodyWriter writer = + (MessageBodyWriter)providers.getMessageBodyWriter(valueCls, valueType, anns, mt); + if (writer == null) { + throwError(null); + } + + try { + writer.writeTo(value, valueCls, valueType, anns, mt, headers, os); + } catch (IOException ex) { + throwError(ex); + } + } + + private static void throwError(Throwable cause) { + throw ExceptionUtils.toInternalServerErrorException(cause, null); + } + + public void setWriteSingleElementAsList(boolean writeSingleElementAsList) { + this.writeSingleElementAsList = writeSingleElementAsList; + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java new file mode 100644 index 0000000..94a85fc --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/AbstractAsyncSubscriber.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.server; + +import java.util.List; + +import javax.ws.rs.container.AsyncResponse; + +import org.apache.cxf.jaxrs.ext.StreamingResponse; + +import rx.Subscriber; + +public abstract class AbstractAsyncSubscriber extends Subscriber { + + private AsyncResponse ar; + + protected AbstractAsyncSubscriber(AsyncResponse ar) { + this.ar = ar; + } + public void resume(T response) { + ar.resume(response); + } + + public void resume(List response) { + ar.resume(response); + } + + public void resume(StreamingResponse response) { + ar.resume(response); + } + + @Override + public void onError(Throwable t) { + ar.resume(t); + } + + protected AsyncResponse getAsyncResponse() { + return ar; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java new file mode 100644 index 0000000..c71c8c1 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/JsonStreamingAsyncSubscriber.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.server; + +import javax.ws.rs.container.AsyncResponse; + +public class JsonStreamingAsyncSubscriber extends StreamingAsyncSubscriber { + public JsonStreamingAsyncSubscriber(AsyncResponse ar) { + this(ar, 1000); + } + public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) { + this(ar, pollTimeout, 0); + } + public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) { + super(ar, "[", "]", ",", pollTimeout, asyncTimeout); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java new file mode 100644 index 0000000..5be73e5 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ListAsyncSubscriber.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.server; + +import java.util.LinkedList; +import java.util.List; + +import javax.ws.rs.container.AsyncResponse; + +public class ListAsyncSubscriber extends AbstractAsyncSubscriber { + + private List beans = new LinkedList(); + public ListAsyncSubscriber(AsyncResponse ar) { + super(ar); + } + @Override + public void onCompleted() { + super.resume(beans); + } + + @Override + public void onNext(T bean) { + beans.add(bean); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java new file mode 100644 index 0000000..bd16292 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/StreamingAsyncSubscriber.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.server; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; + +import org.apache.cxf.common.util.StringUtils; +import org.apache.cxf.jaxrs.ext.StreamingResponse; + +public class StreamingAsyncSubscriber extends AbstractAsyncSubscriber { + + private BlockingQueue queue = new LinkedBlockingQueue(); + private String openTag; + private String closeTag; + private String separator; + private long pollTimeout; + private long asyncTimeout; + private volatile boolean completed; + private AtomicBoolean firstWriteDone = new AtomicBoolean(); + public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) { + this(ar, openTag, closeTag, "", 1000); + } + public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, + long pollTimeout) { + this(ar, openTag, closeTag, sep, pollTimeout, 0); + } + public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, + long pollTimeout, long asyncTimeout) { + super(ar); + this.openTag = openTag; + this.closeTag = closeTag; + this.separator = sep; + this.pollTimeout = pollTimeout; + this.asyncTimeout = 0; + if (asyncTimeout > 0) { + ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); + ar.setTimeoutHandler(new TimeoutHandlerImpl()); + } + } + @Override + public void onStart() { + if (asyncTimeout == 0) { + resumeAsyncResponse(); + } + } + private void resumeAsyncResponse() { + super.resume(new StreamingResponseImpl()); + } + @Override + public void onCompleted() { + completed = true; + } + + @Override + public void onNext(T bean) { + if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) { + resumeAsyncResponse(); + } + queue.add(bean); + } + private class StreamingResponseImpl implements StreamingResponse { + + @Override + public void writeTo(Writer writer) throws IOException { + if (openTag != null) { + writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag)); + } + while (!completed || queue.size() > 0) { + try { + T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); + if (bean != null) { + if (firstWriteDone.getAndSet(true)) { + writer.getEntityStream().write(StringUtils.toBytesUTF8(separator)); + } + writer.write(bean); + } + } catch (InterruptedException ex) { + // ignore + } + } + if (closeTag != null) { + writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag)); + } + + } + + } + public class TimeoutHandlerImpl implements TimeoutHandler { + + @Override + public void handleTimeout(AsyncResponse asyncResponse) { + if (queue.isEmpty()) { + asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); + } else { + resumeAsyncResponse(); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java deleted file mode 100644 index c6d0086..0000000 --- a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/ObservableWriterTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.jaxrs.rx; - -import org.junit.Assert; -import org.junit.Test; - -public class ObservableWriterTest extends Assert { - - - @Test - public void testIsWriteable() { - } - -} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java new file mode 100644 index 0000000..049377b --- /dev/null +++ b/rt/rs/extensions/rx/src/test/java/org/apache/cxf/jaxrs/rx/provider/ObservableWriterTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.jaxrs.rx.provider; + +import org.junit.Assert; +import org.junit.Test; + +public class ObservableWriterTest extends Assert { + + + @Test + public void testIsWriteable() { + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java index 2f4c496..59da436 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java @@ -29,7 +29,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; -import org.apache.cxf.jaxrs.rx.ObservableReader; +import org.apache.cxf.jaxrs.rx.provider.ObservableReader; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java index 4915b71..afd5cbb 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java @@ -29,7 +29,7 @@ import org.apache.cxf.interceptor.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.rx.ObservableWriter; +import org.apache.cxf.jaxrs.rx.provider.ObservableWriter; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; http://git-wip-us.apache.org/repos/asf/cxf/blob/6fcdc7e9/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java index bac9472..9c9ae08 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java @@ -29,9 +29,9 @@ import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; -import org.apache.cxf.jaxrs.rx.AbstractAsyncSubscriber; -import org.apache.cxf.jaxrs.rx.JsonStreamingAsyncSubscriber; -import org.apache.cxf.jaxrs.rx.ListAsyncSubscriber; +import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber; +import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber; +import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber; import rx.Observable; import rx.schedulers.Schedulers;