Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 42E46200B80 for ; Wed, 14 Sep 2016 13:10:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 41580160ABA; Wed, 14 Sep 2016 11:10:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D050160AB4 for ; Wed, 14 Sep 2016 13:10:04 +0200 (CEST) Received: (qmail 32956 invoked by uid 500); 14 Sep 2016 11:10:03 -0000 Mailing-List: contact commits-help@aries.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aries.apache.org Delivered-To: mailing list commits@aries.apache.org Received: (qmail 32945 invoked by uid 99); 14 Sep 2016 11:10:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Sep 2016 11:10:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6CE26DFCC0; Wed, 14 Sep 2016 11:10:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cschneider@apache.org To: commits@aries.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: aries-rsa git commit: [ARIES-1587] Support streams in fastbin Date: Wed, 14 Sep 2016 11:10:03 +0000 (UTC) archived-at: Wed, 14 Sep 2016 11:10:06 -0000 Repository: aries-rsa Updated Branches: refs/heads/master 4c8ae19f4 -> 475f01328 [ARIES-1587] Support streams in fastbin injects serializable proxy streams that read/write remotely Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/475f0132 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/475f0132 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/475f0132 Branch: refs/heads/master Commit: 475f01328983e3e1b11deb41876ad21eddff333c Parents: 4c8ae19 Author: Johannes Utzig Authored: Wed Jul 27 10:42:21 2016 +0200 Committer: Christian Schneider Committed: Tue Sep 13 16:40:49 2016 +0200 ---------------------------------------------------------------------- provider/fastbin/Readme.md | 6 + .../aries/rsa/provider/fastbin/Activator.java | 22 +- .../rsa/provider/fastbin/FastBinProvider.java | 8 + .../rsa/provider/fastbin/io/ServerInvoker.java | 3 + .../rsa/provider/fastbin/streams/Chunk.java | 67 ++++++ .../fastbin/streams/InputStreamProxy.java | 158 +++++++++++++ .../fastbin/streams/OutputStreamProxy.java | 152 ++++++++++++ .../fastbin/streams/StreamProvider.java | 75 ++++++ .../fastbin/streams/StreamProviderImpl.java | 113 +++++++++ .../fastbin/tcp/AbstractInvocationStrategy.java | 36 +++ .../fastbin/tcp/BlockingInvocationStrategy.java | 3 + .../provider/fastbin/tcp/ServerInvokerImpl.java | 26 +++ .../provider/fastbin/StreamInvocationTest.java | 232 +++++++++++++++++++ .../fastbin/streams/InputStreamProxyTest.java | 146 ++++++++++++ .../fastbin/streams/OutputStreamProxyTest.java | 81 +++++++ 15 files changed, 1127 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/Readme.md ---------------------------------------------------------------------- diff --git a/provider/fastbin/Readme.md b/provider/fastbin/Readme.md index 2997a1b..bc24240 100644 --- a/provider/fastbin/Readme.md +++ b/provider/fastbin/Readme.md @@ -11,6 +11,12 @@ Sync remote calls have a default timeout of 5 minutes. For long running operatio as the return value of the remote method. The client will receive a proxy of that type that will be resolved async as soon as the server finished computation. +## Streaming Data + +When large amount of data (e.g. files) need to be transfered remotely it is not advisable to use large byte arrays as this will allocate a lot of memory. Instead the fastbin transport allows to +use `InputStream` and `OutputStream` as parameter or return value. When a remote method contains such a parameter, the stream is replaced with a proxy implementation that pipes data remotely from/to the original stream. + + ## Endpoint Configuration service.exported.configs: aries.fastbin http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java index b89de14..e93e873 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java @@ -22,6 +22,8 @@ import java.util.Dictionary; import java.util.Hashtable; import java.util.concurrent.TimeUnit; +import org.apache.aries.rsa.provider.fastbin.io.ClientInvoker; +import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker; import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator; import org.apache.aries.rsa.spi.DistributionProvider; import org.osgi.service.cm.ManagedService; @@ -29,7 +31,10 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants; public class Activator extends BaseActivator implements ManagedService { - private FastBinProvider provider; + static Activator INSTANCE; + FastBinProvider provider; + ClientInvoker client; + ServerInvoker server; @Override protected void doOpen() throws Exception { @@ -38,6 +43,7 @@ public class Activator extends BaseActivator implements ManagedService { @Override protected void doStart() throws Exception { + INSTANCE = this; String uri = getString("uri", "tcp://0.0.0.0:2543"); String exportedAddress = getString("exportedAddress", null); if (exportedAddress == null) { @@ -45,6 +51,8 @@ public class Activator extends BaseActivator implements ManagedService { } long timeout = getLong("timeout", TimeUnit.MINUTES.toMillis(5)); provider = new FastBinProvider(uri, exportedAddress, timeout); + client = provider.getClient(); + server = provider.getServer(); Dictionary props = new Hashtable<>(); props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[]{}); props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, provider.getSupportedTypes()); @@ -63,4 +71,16 @@ public class Activator extends BaseActivator implements ManagedService { } } + public ClientInvoker getClient() { + return client; + } + + public ServerInvoker getServer() { + return server; + } + + public static Activator getInstance() { + return INSTANCE; + } + } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java index 1491d41..4cf3be7 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java @@ -72,6 +72,14 @@ public class FastBinProvider implements DistributionProvider { server.stop(); } + public ClientInvoker getClient() { + return client; + } + + public ServerInvoker getServer() { + return server; + } + @Override public String[] getSupportedTypes() { return new String[] {FASTBIN_CONFIG_TYPE}; http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java index dd3d83b..cc88aaa 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java @@ -18,6 +18,8 @@ */ package org.apache.aries.rsa.provider.fastbin.io; +import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider; + public interface ServerInvoker extends Service { String getConnectAddress(); @@ -26,6 +28,7 @@ public interface ServerInvoker extends Service { void unregisterService(String id); + StreamProvider getStreamProvider(); public interface ServiceFactory { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java new file mode 100644 index 0000000..92ea426 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.streams; + +import java.io.Serializable; + +/** + * + * Represents of chunk of data streamed between client and server + *

+ * A chunk comes with a sequence number to verify the correct order of packages + * + */ +public class Chunk implements Serializable { + + /** field serialVersionUID */ + private static final long serialVersionUID = -2809449169706358272L; + private int chunkNumber; + private byte[] data; + private boolean last; + + public Chunk(byte[] data, int chunkNumber) { + this(data,chunkNumber,false); + } + + public Chunk(byte[] data, int chunkNumber, boolean last) { + this.data = data; + this.chunkNumber = chunkNumber; + this.last = last; + } + + + public byte[] getData() { + return data; + } + + public int getChunkNumber() { + return chunkNumber; + } + + public void setLast(boolean last) { + this.last = last; + } + + public boolean isLast() { + return last; + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java new file mode 100644 index 0000000..f9d9e2f --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.streams; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; + +import org.apache.aries.rsa.provider.fastbin.Activator; + +public class InputStreamProxy extends InputStream implements Serializable { + + /** field serialVersionUID */ + private static final long serialVersionUID = 4741860068546150748L; + private int streamID; + private String address; + + private transient StreamProvider streamProvider; + private transient byte[] buffer; + private transient int position; + private transient int expectedChunkNumber = 0; + private transient boolean reachedEnd = false; + + public InputStreamProxy(int streamID, String address) { + this.streamID = streamID; + this.address = address; + } + + @Override + public int read() throws IOException { + try{ + return readInternal(); + } + catch (IOException e) { + // clean up on the server side + closeSilent(); + throw e; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try{ + return super.read(b, off, len); + } + catch (IOException e) { + // clean up on the server side + closeSilent(); + throw e; + } + } + + /** + * @see java.io.InputStream#read() + */ + public int readInternal() throws IOException { + if(buffer == null || position==buffer.length) + fillBuffer(); + + if(position==buffer.length) { + //still no data. + if(reachedEnd) + return -1; + //try again + return read(); + } + return buffer[position++]; + } + + private void fillBuffer() throws IOException { + if(reachedEnd) { + return; + } + position = 0; + Chunk chunk = streamProvider.read(streamID); + if(expectedChunkNumber!=chunk.getChunkNumber()) + throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+expectedChunkNumber); + expectedChunkNumber++; + buffer = chunk.getData(); + reachedEnd = chunk.isLast(); + } + + public int readInternal(byte[] b, int off, int len) throws IOException { + if(len==0) + return 0; + int available = available(); + if(available <= 0) { + if(reachedEnd) + return -1; + fillBuffer(); + return read(b, off, len); + } + int processed = 0; + int ready = Math.min(available, len); + System.arraycopy(buffer, position, b, off, ready); + processed += ready; + position += ready; + // delegate to the next chunk + if (processed == len) { + return processed; + } + int alsoRead = Math.max(0, read(b, off + processed, len - processed)); + return processed + alsoRead; + } + + @Override + public int available() throws IOException { + if(buffer == null) + return 0; + return buffer.length-position; + } + + @Override + public void close() throws IOException { + streamProvider.close(streamID); + } + + private void closeSilent() { + try{ + close(); + } catch (Exception e) { + //NOOP + } + } + + private void readObject(ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + InvocationHandler handler = Activator.getInstance().getClient().getProxy(address, StreamProvider.STREAM_PROVIDER_SERVICE_NAME, getClass().getClassLoader()); + streamProvider = (StreamProvider)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{StreamProvider.class}, handler); + } + + protected void setStreamProvider(StreamProvider streamProvider) { + this.streamProvider = streamProvider; + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java new file mode 100644 index 0000000..849b47d --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.streams; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.aries.rsa.provider.fastbin.Activator; + +public class OutputStreamProxy extends OutputStream implements Serializable { + + /** field serialVersionUID */ + private static final long serialVersionUID = -6008791618074159841L; + private int streamID; + private String address; + private transient StreamProvider streamProvider; + private transient int position; + private transient byte[] buffer; + private transient AtomicInteger chunkCounter; + + public OutputStreamProxy(int streamID, String address) { + this.streamID = streamID; + this.address = address; + init(); + } + + + private final void init() { + buffer = new byte[StreamProviderImpl.CHUNK_SIZE]; + chunkCounter = new AtomicInteger(-1); + } + + + @Override + public void close() throws IOException { + flush(); + streamProvider.close(streamID); + } + + private void closeSilent() { + try{ + close(); + } catch (Exception e) { + //NOOP + } + } + + private void readObject(ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + InvocationHandler handler = Activator.getInstance().getClient().getProxy(address, StreamProvider.STREAM_PROVIDER_SERVICE_NAME, getClass().getClassLoader()); + streamProvider = (StreamProvider)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{StreamProvider.class}, handler); + init(); + } + + protected void setStreamProvider(StreamProvider streamProvider) { + this.streamProvider = streamProvider; + } + + + @Override + public void write(int b) throws IOException { + try{ + writeInternal(b); + } catch(IOException e) { + closeSilent(); + throw e; + } + + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + try{ + writeInternal(b, off, len); + } catch(IOException e) { + closeSilent(); + throw e; + } + } + + public void writeInternal(int b) throws IOException { + if(position == buffer.length) + flush(); + buffer[position++] = (byte)b; + + } + + public void writeInternal(byte[] b, int off, int len) throws IOException { + if(len <= 0) + return; + int processed = 0; + while(processed < len) { + int available = buffer.length - position; + int chunkLength = Math.min(len-processed, available); + System.arraycopy(b, off, buffer, position, chunkLength); + position += chunkLength; + processed += chunkLength; + if(processed < len) { + //there is more to go, but now the buffer is full -> flush it + flush(); + } + } + } + + @Override + public void flush() throws IOException { + try{ + flushInternal(); + } catch(IOException e) { + closeSilent(); + throw e; + } + } + + public void flushInternal() throws IOException { + if(position==0) + return; + byte[] toSend = buffer; + if(position < buffer.length) { + toSend = new byte[position]; + System.arraycopy(buffer, 0, toSend, 0, position); + } + Chunk chunk = new Chunk(toSend, chunkCounter.incrementAndGet()); + streamProvider.write(streamID, chunk); + position = 0; + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java new file mode 100644 index 0000000..4aed70c --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.streams; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker; + +/** + * StreamProvider is a well-known service that gets auto registered in the {@link ServerInvoker} + * to enable Input/OutputStreams in remote calls + */ +public interface StreamProvider { + + public static final String STREAM_PROVIDER_SERVICE_NAME = "stream-provider"; + + /** + * closes the specified stream and makes it inaccessible from remote + * @param streamID + * @throws IOException + */ + void close(int streamID) throws IOException; + + /** + * reads the next chunk from the specified stream + * @param streamID + * @return the next chunk of data + * @throws IOException + */ + Chunk read(int streamID) throws IOException; + + /** + * writes the next chunk of data to the specified output stream + * @param streamID + * @param chunk + * @throws IOException + */ + void write(int streamID, Chunk chunk) throws IOException; + + /** + * registers a new (local) input stream that will be made available for remote calls. + * @param in + * @return the stream id + */ + int registerStream(InputStream in); + + /** + * registers a new (local) output stream that will be made available for remote calls. + * @param out + * @return the stream id + */ + int registerStream(OutputStream out); + +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java new file mode 100644 index 0000000..c23370f --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.streams; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class StreamProviderImpl implements StreamProvider { + + private ConcurrentHashMap streams = new ConcurrentHashMap<>(); + private ConcurrentHashMap chunks = new ConcurrentHashMap<>(); + private AtomicInteger counter = new AtomicInteger(0); + protected static final int CHUNK_SIZE = 4096*16; //64k + private static final byte[] EMPTY = new byte[0]; + + ThreadLocal buffer = new ThreadLocal(){ + @Override + protected byte[] initialValue() { + return new byte[CHUNK_SIZE]; + } + }; + + public int registerStream(InputStream in) { + int streamID = counter.incrementAndGet(); + streams.put(streamID, in); + chunks.put(streamID, new AtomicInteger(-1)); + return streamID; + } + + + @Override + public int registerStream(OutputStream out) { + int streamID = counter.incrementAndGet(); + streams.put(streamID, out); + chunks.put(streamID, new AtomicInteger(-1)); + return streamID; + } + + @Override + public void close(int streamID) throws IOException { + Closeable stream = streams.remove(streamID); + chunks.remove(streamID); + if(stream != null) { + stream.close(); + } + } + + @Override + public Chunk read(int streamID) throws IOException { + InputStream inputStream = getStream(streamID); + AtomicInteger chunkNumber = chunks.get(streamID); + byte[] result = buffer.get(); + int read = inputStream.read(result); + if(read<0) { + close(streamID); //we are finished, best clean it up right away + return new Chunk(EMPTY, chunkNumber.incrementAndGet(), true); + } + if(read!=result.length) { + byte[] tmp = new byte[read]; + System.arraycopy(result, 0, tmp, 0, read); + result = tmp; + } + return new Chunk(result, chunkNumber.incrementAndGet()); + } + + @Override + public void write(int streamID, Chunk chunk) throws IOException { + OutputStream out = getStream(streamID); + int nextChunkNumber = chunks.get(streamID).incrementAndGet(); + if(chunk.getChunkNumber() != nextChunkNumber) { + throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+nextChunkNumber); + } + out.write(chunk.getData()); + } + + @SuppressWarnings({"unchecked"}) + private T getStream(int id) throws IOException { + Closeable closeable = streams.get(id); + if(closeable == null) + throw new IOException("No Stream with id " + id + "available"); + try { + T result = (T)closeable; + return result; + } + catch (ClassCastException e) { + throw new IOException("No Stream with id " + id + "available"); + } + } + +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java index 75f06a9..2e9937a 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java @@ -18,10 +18,15 @@ */ package org.apache.aries.rsa.provider.fastbin.tcp; +import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.aries.rsa.provider.fastbin.Activator; import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.apache.aries.rsa.provider.fastbin.streams.InputStreamProxy; +import org.apache.aries.rsa.provider.fastbin.streams.OutputStreamProxy; import org.fusesource.hawtbuf.DataByteArrayInputStream; import org.fusesource.hawtbuf.DataByteArrayOutputStream; import org.osgi.framework.ServiceException; @@ -35,10 +40,41 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy @Override public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception { + replaceStreamParameters(method, args); encodeRequest(serializationStrategy, loader, method, args, requestStream); return createResponse(serializationStrategy, loader,method, args); } + protected void replaceStreamParameters(Method method, Object[] args) { + Class< ? >[] types = method.getParameterTypes(); + if(args==null) + return; + for (int i = 0; i < args.length; i++) { + if(isStream(types[i])) { + args[i] = replaceStream(args[i]); + } + } + } + + protected Object replaceStream(Object value) { + if (value instanceof InputStream) { + InputStream in = (InputStream)value; + int streamID = Activator.getInstance().getServer().getStreamProvider().registerStream(in); + value = new InputStreamProxy(streamID, Activator.getInstance().getServer().getConnectAddress()); + } + else if (value instanceof OutputStream) { + OutputStream out = (OutputStream)value; + int streamID = Activator.getInstance().getServer().getStreamProvider().registerStream(out); + value = new OutputStreamProxy(streamID, Activator.getInstance().getServer().getConnectAddress()); + } + return value; + } + + protected boolean isStream(Class clazz) { + return clazz==InputStream.class || clazz==OutputStream.class; + } + + /** * encodes the request to the stream * @param serializationStrategy http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java index 5190157..753b447 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java @@ -100,6 +100,9 @@ public class BlockingInvocationStrategy extends AbstractInvocationStrategy { final Object[] args = new Object[types.length]; serializationStrategy.decodeRequest(loader, types, requestStream, args); value = method.invoke(target, args); + if(isStream(method.getReturnType())) { + value = replaceStream(value); + } } catch (Throwable t) { if (t instanceof InvocationTargetException) { error = t.getCause(); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java index c365a56..d56d64c 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java @@ -38,6 +38,8 @@ import org.apache.aries.rsa.provider.fastbin.io.Transport; import org.apache.aries.rsa.provider.fastbin.io.TransportAcceptListener; import org.apache.aries.rsa.provider.fastbin.io.TransportListener; import org.apache.aries.rsa.provider.fastbin.io.TransportServer; +import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider; +import org.apache.aries.rsa.provider.fastbin.streams.StreamProviderImpl; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.BufferEditor; import org.fusesource.hawtbuf.DataByteArrayInputStream; @@ -67,6 +69,7 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { private final Map serializationStrategies; protected final TransportServer server; protected final Map holders = new HashMap(); + private StreamProvider streamProvider; static class MethodData { @@ -165,6 +168,11 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { return this.server.getConnectAddress(); } + @Override + public StreamProvider getStreamProvider() { + return streamProvider; + } + public void registerService(final String id, final ServiceFactory service, final ClassLoader classLoader) { queue().execute(new Runnable() { public void run() { @@ -186,9 +194,27 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { } public void start(Runnable onComplete) throws Exception { + registerStreamProvider(); this.server.start(onComplete); } + private void registerStreamProvider() { + streamProvider = new StreamProviderImpl(); + registerService(StreamProvider.STREAM_PROVIDER_SERVICE_NAME, new ServerInvoker.ServiceFactory() { + + @Override + public Object get() { + return streamProvider; + } + + @Override + public void unget(){ + // nothing to do + } + }, getClass().getClassLoader()); + + } + public void stop() { stop(null); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java new file mode 100644 index 0000000..241eda0 --- /dev/null +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin; + + +import static org.junit.Assert.*; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker; +import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider; +import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl; +import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class StreamInvocationTest { + + private ServerInvokerImpl server; + private ClientInvokerImpl client; + private TestService testService; + + + @Before + public void setup() throws Exception + { + DispatchQueue queue = Dispatch.createQueue(); + HashMap map = new HashMap(); + server = new ServerInvokerImpl("tcp://localhost:0", queue, map); + server.start(); + + client = new ClientInvokerImpl(queue, map); + client.start(); +// server.stop(); + server.registerService("service-id", new ServerInvoker.ServiceFactory() + { + public Object get() + { + return new TestServiceImpl(); + } + + + public void unget() + {} + }, TestServiceImpl.class.getClassLoader()); + + InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id", TestServiceImpl.class.getClassLoader()); + testService = (TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new Class[]{TestService.class}, handler); + Activator.INSTANCE = new Activator(); + Activator.INSTANCE.client = client; + Activator.INSTANCE.server = server; + } + + + @After + public void tearDown() + { + server.stop(); + client.stop(); + } + + + @Test + public void testToString() throws IOException { + assertEquals("Test",testService.toString(new ByteArrayInputStream("Test".getBytes()))); + + } + + @Test(timeout=5000) + public void testToStringLarge() throws IOException { + InputStream in = fillStream('a', 1000000); + long time = System.currentTimeMillis(); + String result = testService.toString(in); //roughly 1 MB of data + System.out.println("Transfered 1MB of data in "+(System.currentTimeMillis()-time)+"ms"); + assertEquals(1000000, result.length()); + for(int i=0;i future = testService.digest(new ByteArrayInputStream(testString.getBytes())); + assertArrayEquals(digest,future.get()); + + } + + public interface TestService { + String toString(InputStream in) throws IOException; + + InputStream toStream(String s) throws IOException; + + void intoStream(OutputStream out, String string) throws IOException; + + Future digest(InputStream in) throws IOException; + } + + public class TestServiceImpl implements TestService { + @Override + public String toString(InputStream in) throws IOException { + StringBuilder b = new StringBuilder(); + try (BufferedReader r = new BufferedReader(new InputStreamReader(in))) { + b.append(r.readLine()); + } + return b.toString(); + } + + @Override + public InputStream toStream(String s) throws IOException { + return new ByteArrayInputStream(s.getBytes()); + } + + @Override + public void intoStream(final OutputStream out, String string) throws IOException { + new Thread(() -> { + try{ + out.write(string.getBytes()); + out.close(); + } catch(Exception e) { + e.printStackTrace(); + } + }).start(); + } + + @Override + public Future digest(InputStream in) throws IOException { + return CompletableFuture.supplyAsync(() -> { + try { + MessageDigest digest = MessageDigest.getInstance("MD5"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int i; + while((i = in.read()) != -1) { + out.write(i); + } + byte[] md5 = digest.digest(out.toByteArray()); + return md5; + } + catch (Exception e) { + e.printStackTrace(); + } + return null; + }); + } + } + + protected InputStream fillStream(char c, int repetitions) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (int i=0; i