aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject [3/3] aries-rsa git commit: [ARIES-1526] Add a fast nio/protobuf provider
Date Mon, 11 Apr 2016 12:05:38 GMT
[ARIES-1526] Add a fast nio/protobuf provider

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/62d835de
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/62d835de
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/62d835de

Branch: refs/heads/master
Commit: 62d835dec07d9592b1ea3fe038cea39b1e9af9aa
Parents: 58bb694
Author: Guillaume Nodet <gnodet@apache.org>
Authored: Mon Apr 11 12:08:14 2016 +0200
Committer: Guillaume Nodet <gnodet@apache.org>
Committed: Mon Apr 11 14:04:42 2016 +0200

----------------------------------------------------------------------
 features/src/main/resources/features.xml        |   9 +-
 parent/pom.xml                                  |  19 +-
 provider/fastbin/bnd.bnd                        |   1 +
 provider/fastbin/pom.xml                        |  62 ++
 .../aries/rsa/provider/fastbin/Activator.java   |  66 ++
 .../rsa/provider/fastbin/BaseActivator.java     | 290 +++++++
 .../rsa/provider/fastbin/FastBinProvider.java   | 146 ++++
 .../rsa/provider/fastbin/api/AsyncCallback.java |  26 +
 .../fastbin/api/AsyncCallbackFuture.java        |  43 +
 .../rsa/provider/fastbin/api/Dispatched.java    |  31 +
 .../api/ObjectSerializationStrategy.java        |  73 ++
 .../api/ProtobufSerializationStrategy.java      | 127 +++
 .../rsa/provider/fastbin/api/Serialization.java |  29 +
 .../fastbin/api/SerializationStrategy.java      |  38 +
 .../rsa/provider/fastbin/io/ClientInvoker.java  |  24 +
 .../rsa/provider/fastbin/io/ProtocolCodec.java  |  97 +++
 .../rsa/provider/fastbin/io/ServerInvoker.java  |  34 +
 .../aries/rsa/provider/fastbin/io/Service.java  |  52 ++
 .../rsa/provider/fastbin/io/Transport.java      |  95 +++
 .../fastbin/io/TransportAcceptListener.java     |  31 +
 .../provider/fastbin/io/TransportListener.java  |  55 ++
 .../provider/fastbin/io/TransportServer.java    |  73 ++
 .../fastbin/tcp/AsyncInvocationStrategy.java    | 188 +++++
 .../fastbin/tcp/BlockingInvocationStrategy.java | 126 +++
 .../provider/fastbin/tcp/ClientInvokerImpl.java | 327 ++++++++
 .../fastbin/tcp/InvocationStrategy.java         |  34 +
 .../fastbin/tcp/LengthPrefixedCodec.java        | 175 ++++
 .../provider/fastbin/tcp/ResponseFuture.java    |  31 +
 .../provider/fastbin/tcp/ServerInvokerImpl.java | 314 +++++++
 .../rsa/provider/fastbin/tcp/TcpTransport.java  | 828 +++++++++++++++++++
 .../fastbin/tcp/TcpTransportFactory.java        | 123 +++
 .../fastbin/tcp/TcpTransportServer.java         | 231 ++++++
 .../rsa/provider/fastbin/tcp/TransportPool.java | 256 ++++++
 .../util/ClassLoaderObjectInputStream.java      |  86 ++
 .../fastbin/util/IntrospectionSupport.java      | 362 ++++++++
 .../provider/fastbin/util/StringSupport.java    |  40 +
 .../rsa/provider/fastbin/util/URISupport.java   | 332 ++++++++
 .../provider/fastbin/util/UuidGenerator.java    | 178 ++++
 .../rsa/provider/fastbin/InvocationTest.java    | 581 +++++++++++++
 .../aries/rsa/provider/fastbin/ManagerTest.java | 106 +++
 .../provider/fastbin/TransportFailureTest.java  | 128 +++
 .../fastbin/tcp/LengthPrefixedCodecTest.java    | 150 ++++
 provider/fastbin/src/test/proto/example.proto   |  28 +
 .../fastbin/src/test/resources/log4j.properties |  35 +
 provider/pom.xml                                |   1 +
 45 files changed, 6079 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/features/src/main/resources/features.xml b/features/src/main/resources/features.xml
index 1f9aea0..ba9bc93 100644
--- a/features/src/main/resources/features.xml
+++ b/features/src/main/resources/features.xml
@@ -11,7 +11,14 @@
         <feature>aries-rsa-core</feature>
         <bundle>mvn:org.apache.aries.rsa.provider/org.apache.aries.rsa.provider.tcp/${project.version}</bundle>
     </feature>
-    
+
+    <feature name="aries-rsa-provider-fastbin" version="${project.version}">
+        <feature>aries-rsa-core</feature>
+        <bundle>mvn:org.fusesource.hawtdispatch/hawtdispatch/${hawtdispatch.version}</bundle>
+        <bundle>mvn:org.fusesource.hawtbuf/hawtbuf/${hawtbuf.version}</bundle>
+        <bundle>mvn:org.apache.aries.rsa.provider/org.apache.aries.rsa.provider.fastbin/${project.version}</bundle>
+    </feature>
+
     <feature name="aries-rsa-discovery-local" version="${project.version}">
         <feature>aries-rsa-core</feature>
         <bundle>mvn:org.apache.aries.rsa.discovery/org.apache.aries.rsa.discovery.local/${project.version}</bundle>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index bdef21b..3ffada2 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -33,6 +33,8 @@
         <rsa.version>1.0.0</rsa.version>
         <slf4j.version>1.7.14</slf4j.version>
         <log4j.version>1.2.6</log4j.version>
+        <hawtdispatch.version>1.21</hawtdispatch.version>
+        <hawtbuf.version>1.11</hawtbuf.version>
         <exam.version>4.8.0</exam.version>
         <cxf.resources.base.path />
         <cxf.checkstyle.extension />
@@ -104,7 +106,22 @@
                 <artifactId>org.apache.aries.rsa.spi</artifactId>
                 <version>${project.version}</version>
             </dependency>
-            
+            <dependency>
+                <groupId>org.fusesource.hawtdispatch</groupId>
+                <artifactId>hawtdispatch</artifactId>
+                <version>${hawtdispatch.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.fusesource.hawtbuf</groupId>
+                <artifactId>hawtbuf</artifactId>
+                <version>${hawtbuf.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.fusesource.hawtbuf</groupId>
+                <artifactId>hawtbuf-proto</artifactId>
+                <version>${hawtbuf.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/bnd.bnd
----------------------------------------------------------------------
diff --git a/provider/fastbin/bnd.bnd b/provider/fastbin/bnd.bnd
new file mode 100644
index 0000000..5249b00
--- /dev/null
+++ b/provider/fastbin/bnd.bnd
@@ -0,0 +1 @@
+Bundle-Activator:  org.apache.aries.rsa.provider.fastbin.Activator

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/pom.xml
----------------------------------------------------------------------
diff --git a/provider/fastbin/pom.xml b/provider/fastbin/pom.xml
new file mode 100644
index 0000000..a748035
--- /dev/null
+++ b/provider/fastbin/pom.xml
@@ -0,0 +1,62 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.rsa</groupId>
+        <artifactId>org.apache.aries.rsa.parent</artifactId>
+        <version>1.9-SNAPSHOT</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.aries.rsa.provider</groupId>
+    <artifactId>org.apache.aries.rsa.provider.fastbin</artifactId>
+    <packaging>bundle</packaging>
+    <name>Aries Remote Service Admin provider FastBin</name>
+    <description>Provider for Java Serialization over FastBin</description>
+
+    <properties>
+        <topDirectoryLocation>../..</topDirectoryLocation>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.aries.rsa</groupId>
+            <artifactId>org.apache.aries.rsa.spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.fusesource.hawtdispatch</groupId>
+            <artifactId>hawtdispatch</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.fusesource.hawtbuf</groupId>
+            <artifactId>hawtbuf</artifactId>
+        </dependency>
+        <!-- this is only needed you you want to use the ProtobufSerializationStrategy -->
+        <dependency>
+            <groupId>org.fusesource.hawtbuf</groupId>
+            <artifactId>hawtbuf-proto</artifactId>
+            <optional>true</optional>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.fusesource.hawtbuf</groupId>
+                <artifactId>hawtbuf-protoc</artifactId>
+                <version>${hawtbuf.version}</version>
+                <configuration>
+                    <type>alt</type>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
new file mode 100644
index 0000000..b89de14
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
+import org.apache.aries.rsa.spi.DistributionProvider;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+public class Activator extends BaseActivator implements ManagedService {
+
+    private FastBinProvider provider;
+
+    @Override
+    protected void doOpen() throws Exception {
+        manage("org.apache.aries.rsa.provider.fastbin");
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        String uri = getString("uri", "tcp://0.0.0.0:2543");
+        String exportedAddress = getString("exportedAddress", null);
+        if (exportedAddress == null) {
+            exportedAddress = UuidGenerator.getHostName();
+        }
+        long timeout = getLong("timeout", TimeUnit.MINUTES.toMillis(5));
+        provider = new FastBinProvider(uri, exportedAddress, timeout);
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[]{});
+        props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, provider.getSupportedTypes());
+        register(DistributionProvider.class, provider, props);
+    }
+
+    @Override
+    protected void doStop() {
+        super.doStop();
+        if (provider != null) {
+            try {
+                provider.close();
+            } finally {
+                provider = null;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java
new file mode 100644
index 0000000..6b7b6fe
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/BaseActivator.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseActivator implements BundleActivator, Runnable {
+
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+    protected BundleContext bundleContext;
+
+    protected ExecutorService executor = new ThreadPoolExecutor(0, 1, 0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>());
+    private AtomicBoolean scheduled = new AtomicBoolean();
+
+    private long schedulerStopTimeout = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
+
+    private List<ServiceRegistration> registrations;
+    private ServiceRegistration managedServiceRegistration;
+    private Dictionary<String, ?> configuration;
+
+    public long getSchedulerStopTimeout() {
+        return schedulerStopTimeout;
+    }
+
+    public void setSchedulerStopTimeout(long schedulerStopTimeout) {
+        this.schedulerStopTimeout = schedulerStopTimeout;
+    }
+
+    @Override
+    public void start(BundleContext context) throws Exception {
+        bundleContext = context;
+        scheduled.set(true);
+        doOpen();
+        scheduled.set(false);
+        if (managedServiceRegistration == null) {
+            try {
+                doStart();
+            } catch (Exception e) {
+                logger.warn("Error starting activator", e);
+                doStop();
+            }
+        } else {
+            reconfigure();
+        }
+    }
+
+    @Override
+    public void stop(BundleContext context) throws Exception {
+        scheduled.set(true);
+        doClose();
+        executor.shutdown();
+        executor.awaitTermination(schedulerStopTimeout, TimeUnit.MILLISECONDS);
+        doStop();
+    }
+
+    protected void doOpen() throws Exception {
+        URL data = bundleContext.getBundle().getResource("OSGI-INF/karaf-tracker/" + getClass().getName());
+        if (data != null) {
+            Properties props = new Properties();
+            try (InputStream is = data.openStream()) {
+                props.load(is);
+            }
+            for (String key : props.stringPropertyNames()) {
+                if ("pid".equals(key)) {
+                    manage(props.getProperty(key));
+                }
+            }
+        }
+    }
+
+    protected void doClose() {
+        if (managedServiceRegistration != null) {
+            managedServiceRegistration.unregister();
+        }
+    }
+
+    protected void doStart() throws Exception {
+    }
+
+    protected void doStop() {
+        if (registrations != null) {
+            for (ServiceRegistration reg : registrations) {
+                reg.unregister();
+            }
+            registrations = null;
+        }
+    }
+
+    /**
+     * Called in {@link #doOpen()}.
+     *
+     * @param pid The configuration PID to manage (ManagedService).
+     */
+    protected void manage(String pid) {
+        Hashtable<String, Object> props = new Hashtable<>();
+        props.put(Constants.SERVICE_PID, pid);
+        managedServiceRegistration = bundleContext.registerService(
+                "org.osgi.service.cm.ManagedService", this, props);
+    }
+
+    public void updated(Dictionary<String, ?> properties) {
+        this.configuration = properties;
+        reconfigure();
+    }
+
+    protected Dictionary<String, ?> getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param key The configuration key
+     * @param def The default value.
+     * @return The value of the configuration key if found, the default value else.
+     */
+    protected int getInt(String key, int def) {
+        if (configuration != null) {
+            Object val = configuration.get(key);
+            if (val instanceof Number) {
+                return ((Number) val).intValue();
+            } else if (val != null) {
+                return Integer.parseInt(val.toString());
+            }
+        }
+        return def;
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param key The configuration key.
+     * @param def The default value.
+     * @return The value of the configuration key if found, the default value else.
+     */
+    protected boolean getBoolean(String key, boolean def) {
+        if (configuration != null) {
+            Object val = configuration.get(key);
+            if (val instanceof Boolean) {
+                return (Boolean) val;
+            } else if (val != null) {
+                return Boolean.parseBoolean(val.toString());
+            }
+        }
+        return def;
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param key The configuration key.
+     * @param def The default value.
+     * @return The value of the configuration key if found, the default value else.
+     */
+    protected long getLong(String key, long def) {
+        if (configuration != null) {
+            Object val = configuration.get(key);
+            if (val instanceof Number) {
+                return ((Number) val).longValue();
+            } else if (val != null) {
+                return Long.parseLong(val.toString());
+            }
+        }
+        return def;
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param key The configuration key.
+     * @param def The default value.
+     * @return The value of the configuration key if found, the default value else.
+     */
+    protected String getString(String key, String def) {
+        if (configuration != null) {
+            Object val = configuration.get(key);
+            if (val != null) {
+                return val.toString();
+            }
+        }
+        return def;
+    }
+
+    protected void reconfigure() {
+        if (scheduled.compareAndSet(false, true)) {
+            executor.submit(this);
+        }
+    }
+
+    @Override
+    public void run() {
+        scheduled.set(false);
+        doStop();
+        try {
+            doStart();
+        } catch (Exception e) {
+            logger.warn("Error starting activator", e);
+            doStop();
+        }
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param clazz The service interface to register.
+     * @param <T> The service type.
+     * @param service The actual service instance to register.
+     */
+    protected <T> void register(Class<T> clazz, T service) {
+        register(clazz, service, null);
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param clazz The service interface to register.
+     * @param <T> The service type.
+     * @param service The actual service instance to register.
+     * @param props The service properties to register.
+     */
+    protected <T> void register(Class<T> clazz, T service, Dictionary<String, ?> props) {
+        trackRegistration(bundleContext.registerService(clazz, service, props));
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param clazz The service interfaces to register.
+     * @param service The actual service instance to register.
+     */
+    protected void register(Class[] clazz, Object service) {
+        register(clazz, service, null);
+    }
+
+    /**
+     * Called in {@link #doStart()}.
+     *
+     * @param clazz The service interfaces to register.
+     * @param service The actual service instance to register.
+     * @param props The service properties to register.
+     */
+    protected void register(Class[] clazz, Object service, Dictionary<String, ?> props) {
+        String[] names = new String[clazz.length];
+        for (int i = 0; i < clazz.length; i++) {
+            names[i] = clazz[i].getName();
+        }
+        trackRegistration(bundleContext.registerService(names, service, props));
+    }
+
+    private void trackRegistration(ServiceRegistration registration) {
+        if (registrations == null) {
+            registrations = new ArrayList<>();
+        }
+        registrations.add(registration);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
new file mode 100644
index 0000000..476a699
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
@@ -0,0 +1,146 @@
+package org.apache.aries.rsa.provider.fastbin;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.io.ClientInvoker;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl;
+import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl;
+import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
+import org.apache.aries.rsa.spi.DistributionProvider;
+import org.apache.aries.rsa.spi.Endpoint;
+import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+public class FastBinProvider implements DistributionProvider {
+
+    public static final String FASTBIN_CONFIG_TYPE = "aries.fastbin";
+
+    public static final String FASTBIN_ADDRESS = FASTBIN_CONFIG_TYPE + ".address";
+
+    private final String uri;
+    private final String exportedAddress;
+    private final long timeout;
+
+    private final DispatchQueue queue = Dispatch.createQueue();
+    private final Map<String, SerializationStrategy> serializationStrategies = new ConcurrentHashMap<>();
+
+    private ClientInvoker client;
+    private ServerInvoker server;
+
+    public FastBinProvider(java.lang.String uri, java.lang.String exportedAddress, long timeout) throws Exception {
+        this.uri = uri;
+        this.exportedAddress = exportedAddress;
+        this.timeout = timeout;
+        // Create client and server
+        this.client = new ClientInvokerImpl(queue, timeout, serializationStrategies);
+        this.server = new ServerInvokerImpl(uri, queue, serializationStrategies);
+        this.client.start();
+        this.server.start();
+    }
+
+    public void close() {
+        client.stop();
+        server.stop();
+    }
+
+    @Override
+    public String[] getSupportedTypes() {
+        return new String[] {FASTBIN_CONFIG_TYPE};
+    }
+
+    @Override
+    public Endpoint exportService(final Object serviceO,
+                                  BundleContext serviceContext,
+                                  Map<String, Object> effectiveProperties,
+                                  Class[] exportedInterfaces) {
+
+        // Compute properties
+        /*
+        Map<String, Object> properties = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+        for (String k : reference.getPropertyKeys()) {
+            properties.put(k, reference.getProperty(k));
+        }
+        // Bail out if there is any intents specified, we don't support any
+        Set<String> intents = Utils.normalize(properties.get(SERVICE_EXPORTED_INTENTS));
+        Set<String> extraIntents = Utils.normalize(properties.get(SERVICE_EXPORTED_INTENTS_EXTRA));
+        if (!intents.isEmpty() || !extraIntents.isEmpty()) {
+            throw new UnsupportedOperationException();
+        }
+        // Bail out if there are any configurations specified, we don't support any
+        Set<String> configs = Utils.normalize(properties.get(SERVICE_EXPORTED_CONFIGS));
+        if (configs.isEmpty()) {
+            configs.add(CONFIG);
+        } else if (!configs.contains(CONFIG)) {
+            throw new UnsupportedOperationException();
+        }
+
+        URI connectUri = new URI(this.server.getConnectAddress());
+        String fabricAddress = connectUri.getScheme() + "://" + exportedAddress + ":" + connectUri.getPort();
+
+        properties.remove(SERVICE_EXPORTED_CONFIGS);
+        properties.put(SERVICE_IMPORTED_CONFIGS, new String[] { CONFIG });
+        properties.put(ENDPOINT_FRAMEWORK_UUID, this.uuid);
+        properties.put(FABRIC_ADDRESS, fabricAddress);
+
+        String uuid = UuidGenerator.getUUID();
+        properties.put(ENDPOINT_ID, uuid);
+        */
+
+        String endpointId = UuidGenerator.getUUID();
+        effectiveProperties.put(RemoteConstants.ENDPOINT_ID, endpointId);
+
+        URI connectUri = URI.create(this.server.getConnectAddress());
+        String fastbinAddress = connectUri.getScheme() + "://" + exportedAddress + ":" + connectUri.getPort();
+        effectiveProperties.put(FASTBIN_ADDRESS, fastbinAddress);
+        effectiveProperties.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, getSupportedTypes());
+
+        // Now, export the service
+        final EndpointDescription description = new EndpointDescription(effectiveProperties);
+
+        // Export it
+        server.registerService(description.getId(), new ServerInvoker.ServiceFactory() {
+            public Object get() {
+                return serviceO;
+            }
+            public void unget() {
+            }
+        }, serviceO.getClass().getClassLoader());
+
+        return new Endpoint() {
+            @Override
+            public EndpointDescription description() {
+                return description;
+            }
+
+            @Override
+            public void close() throws IOException {
+                server.unregisterService(description.getId());
+            }
+        };
+    }
+
+    @Override
+    public Object importEndpoint(ClassLoader cl,
+                                 BundleContext consumerContext,
+                                 Class[] interfaces,
+                                 EndpointDescription endpoint)
+            throws IntentUnsatisfiedException {
+
+        String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS);
+        InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl);
+        return Proxy.newProxyInstance(cl, interfaces, handler);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java
new file mode 100644
index 0000000..5ca7bbd
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallback.java
@@ -0,0 +1,26 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public interface AsyncCallback<T> {
+    void onSuccess(T result);
+    void onFailure(Throwable failure);
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java
new file mode 100644
index 0000000..71906bd
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/AsyncCallbackFuture.java
@@ -0,0 +1,43 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public class AsyncCallbackFuture<T> extends FutureTask<T> implements AsyncCallback<T> {
+
+    public AsyncCallbackFuture() {
+        super(new Callable<T>() {
+            public T call() {
+                return null;
+            }
+        });
+    }
+
+    public void onSuccess(T result) {
+        super.set(result);
+    }
+
+    public void onFailure(Throwable failure) {
+        super.setException(failure);
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java
new file mode 100644
index 0000000..77864d1
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Dispatched.java
@@ -0,0 +1,31 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+
+/**
+ * <p>
+ * Implemented by object which expect to be call from the execution context
+ * of a dispatch queue.
+ * </p>
+ *
+ */
+public interface Dispatched {
+
+    DispatchQueue queue();
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java
new file mode 100644
index 0000000..c5762ca
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ObjectSerializationStrategy.java
@@ -0,0 +1,73 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+import org.apache.aries.rsa.provider.fastbin.util.ClassLoaderObjectInputStream;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public class ObjectSerializationStrategy implements SerializationStrategy {
+    public static final ObjectSerializationStrategy INSTANCE = new ObjectSerializationStrategy();
+
+    public String name() {
+        return "object";
+    }
+
+    public void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws IOException {
+        ObjectOutputStream oos = new ObjectOutputStream(target);
+        oos.writeObject(args);
+        oos.flush();
+    }
+
+    public void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws IOException, ClassNotFoundException {
+        ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(source);
+        ois.setClassLoader(loader);
+        Throwable error = (Throwable) ois.readObject();
+        Object value = ois.readObject();
+        if (error != null) {
+            result.onFailure(error);
+        } else {
+            result.onSuccess(value);
+        }
+    }
+
+    public void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws IOException, ClassNotFoundException {
+        final ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(source);
+        ois.setClassLoader(loader);
+        final Object[] args = (Object[]) ois.readObject();
+        if( args!=null ) {
+            System.arraycopy(args, 0, target, 0, args.length);
+        }
+    }
+
+
+    public void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws IOException, ClassNotFoundException {
+        ObjectOutputStream oos = new ObjectOutputStream(target);
+        oos.writeObject(error);
+        oos.writeObject(value);
+        oos.flush();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java
new file mode 100644
index 0000000..c53c65d
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/ProtobufSerializationStrategy.java
@@ -0,0 +1,127 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.proto.PBMessage;
+import org.fusesource.hawtbuf.proto.PBMessageFactory;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public class ProtobufSerializationStrategy implements SerializationStrategy {
+
+    public static final ProtobufSerializationStrategy INSTANCE = new ProtobufSerializationStrategy();
+
+    public String name() {
+        return "protobuf";
+    }
+
+    private void encodeProtobuf(Class<?> type, Object arg, DataByteArrayOutputStream target) throws IOException {
+        if( !PBMessage.class.isAssignableFrom(type) ) {
+            throw new IllegalArgumentException("Invalid "+name()+" serialization method: method argument not a "+PBMessage.class.getName());
+        }
+        PBMessage msg = (PBMessage) arg;
+        if( msg==null ) {
+            return;
+        }
+        msg.freeze().writeUnframed(target);
+    }
+
+    private Object decodeProtobuf(Class<?> type, DataByteArrayInputStream source) throws IllegalAccessException, NoSuchFieldException, IOException {
+        if( !PBMessage.class.isAssignableFrom(type) ) {
+            throw new IllegalArgumentException("Invalid "+name()+" serialization method: method argument not a "+PBMessage.class.getName());
+        }
+
+        // Get the factory instance...
+        PBMessageFactory factory = (PBMessageFactory) type.getEnclosingClass().getField("FACTORY").get(null);
+        PBMessage msg = factory.parseUnframed(source);
+        String name = type.getName();
+        Object rc;
+        if( name.endsWith("$Getter") || name.endsWith("$Buffer") ) {
+            // Interface is ok we us giving them a read only impl.
+            rc = msg;
+        } else {
+            // They want a read/write impl.
+            rc = msg.copy();
+        }
+        return rc;
+    }
+
+    public void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws IOException {
+        if( types.length == 0 ) {
+            return;
+        } else if( types.length == 1 ) {
+            encodeProtobuf(types[0], args[0], target);
+        } else {
+            throw new IllegalArgumentException("Invalid "+name()+" serialization method: methods must have zero or one argument.");
+        }
+    }
+
+    public void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws IOException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
+        if( types.length == 0 ) {
+            return;
+        } else if( types.length == 1 ) {
+            target[0] = decodeProtobuf(types[0], source);
+        } else {
+            throw new IllegalArgumentException("Invalid "+name()+" serialization method: methods must have zero or one argument.");
+        }
+    }
+
+    public void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws IOException, ClassNotFoundException {
+        if( error!=null ) {
+            target.writeBoolean(true);
+            target.writeUTF(error.getClass().getName());
+            target.writeUTF(error.getMessage());
+        } else {
+            target.writeBoolean(false);
+            encodeProtobuf(type, value, target);
+        }
+    }
+
+    public void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws IOException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException {
+        if( source.readBoolean() ) {
+            String className = source.readUTF();
+            String message = source.readUTF();
+
+            Throwable error;
+            try {
+                // try to build the exception...
+                Constructor<?> ctr = loader.loadClass(className).getConstructor(new Class[]{String.class});
+                error = (Throwable) ctr.newInstance(message);
+            } catch (Throwable e) {
+                // fallback to something simple..
+                error = new RuntimeException(className+": "+message);
+            }
+            result.onFailure(error);
+
+        } else {
+            result.onSuccess(decodeProtobuf(type, source));
+        }
+
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java
new file mode 100644
index 0000000..3650085
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/Serialization.java
@@ -0,0 +1,29 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+public @interface Serialization {
+    String value();
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java
new file mode 100644
index 0000000..d5ac175
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/api/SerializationStrategy.java
@@ -0,0 +1,38 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.api;
+
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public interface SerializationStrategy {
+
+    String name();
+
+    void encodeRequest(ClassLoader loader, Class<?>[] types, Object[] args, DataByteArrayOutputStream target) throws Exception;
+
+    void decodeResponse(ClassLoader loader, Class<?> type, DataByteArrayInputStream source, AsyncCallback result) throws Exception;
+
+    void decodeRequest(ClassLoader loader, Class<?>[] types, DataByteArrayInputStream source, Object[] target) throws Exception;
+
+    void encodeResponse(ClassLoader loader, Class<?> type, Object value, Throwable error, DataByteArrayOutputStream target) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java
new file mode 100644
index 0000000..1d04e7c
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ClientInvoker.java
@@ -0,0 +1,24 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+import java.lang.reflect.InvocationHandler;
+
+public interface ClientInvoker extends Service {
+
+    InvocationHandler getProxy(String address, String service, ClassLoader classLoader);
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java
new file mode 100644
index 0000000..e5ea148
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ProtocolCodec.java
@@ -0,0 +1,97 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+
+
+/**
+ * Interface to encode and decode commands in and out of a a non blocking channel.
+ *
+ */
+public interface ProtocolCodec {
+
+    ///////////////////////////////////////////////////////////////////
+    //
+    // Methods related with reading from the channel
+    //
+    ///////////////////////////////////////////////////////////////////
+
+    /**
+     * @param channel
+     */
+    public void setReadableByteChannel(ReadableByteChannel channel);
+
+    /**
+     * Non-blocking channel based decoding.
+     * 
+     * @return
+     * @throws IOException
+     */
+    Object read() throws IOException;
+
+    /**
+     * @return The number of bytes received.
+     */
+    public long getReadCounter();
+
+
+    ///////////////////////////////////////////////////////////////////
+    //
+    // Methods related with writing to the channel
+    //
+    ///////////////////////////////////////////////////////////////////
+
+
+    enum BufferState {
+        EMPTY,
+        WAS_EMPTY,
+        NOT_EMPTY,
+        FULL,
+    }
+
+    public void setWritableByteChannel(WritableByteChannel channel);
+
+    /**
+     * Non-blocking channel based encoding.
+     *
+     * @return true if the write completed.
+     * @throws IOException
+     */
+    BufferState write(Object value) throws IOException;
+
+    /**
+     * Attempts to complete the previous write which did not complete.
+     * @return
+     * @throws IOException
+     */
+    BufferState flush() throws IOException;
+
+    /**
+     * @return true if the codec will no accept any more writes.
+     */
+    boolean full();
+
+    /**
+     * @return The number of bytes written.
+     */
+    public long getWriteCounter();
+
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
new file mode 100644
index 0000000..bc1d68c
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
@@ -0,0 +1,34 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+public interface ServerInvoker extends Service {
+
+    String getConnectAddress();
+
+    void registerService(String id, ServiceFactory service, ClassLoader classLoader);
+
+    void unregisterService(String id);
+
+
+    public interface ServiceFactory {
+
+        Object get();
+
+        void unget();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java
new file mode 100644
index 0000000..71bbd5d
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Service.java
@@ -0,0 +1,52 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+
+/**
+ * The core lifecyle interface for ActiveMQ components.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface Service {
+
+    /**
+     * Starts the service.  No guarantee is given that the service has fully started
+     * by the time this method returns.
+     */
+    void start() throws Exception;
+
+    /**
+     * Starts the service.  Executes the onComplete runnable once the service has fully started up.
+     *
+     * @param onComplete my be set to null if not interested in a callback.
+     */
+    void start(Runnable onComplete) throws Exception;
+
+    /**
+     * Stops the service.  No guarantee is given that the service has fully stopped
+     * by the time this method returns.
+     */
+    void stop();
+
+    /**
+     * Stops the service.  Executes the onComplete runnable once the service has fully stopped.
+     *
+     * @param onComplete my be set to null if not interested in a callback.
+     */
+    void stop(Runnable onComplete);
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java
new file mode 100644
index 0000000..f41b478
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/Transport.java
@@ -0,0 +1,95 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+import org.apache.aries.rsa.provider.fastbin.api.Dispatched;
+import org.fusesource.hawtdispatch.DispatchQueue;
+
+/**
+ * Represents an abstract connection.  It can be a client side or server side connection.
+ * 
+ */
+public interface Transport extends Service, Dispatched {
+
+
+    boolean full();
+
+    /**
+     * A one way asynchronous send of a command.  Only sent if the the transport is not full.
+     * 
+     * @param command
+     * @return true if the command was accepted.
+     */
+    boolean offer(Object command);
+
+    /**
+     * Returns the current transport listener
+     *
+     * @return
+     */
+    TransportListener getTransportListener();
+
+    /**
+     * Registers an inbound command listener
+     *
+     * @param commandListener
+     */
+    void setTransportListener(TransportListener commandListener);
+
+    /**
+     * Sets the dispatch queue used by the transport
+     *
+     * @param queue
+     */
+    void setDispatchQueue(DispatchQueue queue);
+
+    /**
+     * suspend delivery of commands.
+     */
+    void suspendRead();
+
+    /**
+     * resume delivery of commands.
+     */
+    void resumeRead();
+
+    /**
+     * @return the remote address for this connection
+     */
+    String getRemoteAddress();
+
+    /**
+     * @return true if the transport is disposed
+     */
+    boolean isDisposed();
+    
+    /**
+     * @return true if the transport is connected
+     */
+    boolean isConnected();
+    
+    /**
+     * @return The protocol codec for the transport.
+     */
+    ProtocolCodec getProtocolCodec();
+
+    /**
+     * Sets the protocol codec for the transport
+     * @param protocolCodec
+     */
+    void setProtocolCodec(ProtocolCodec protocolCodec);
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java
new file mode 100644
index 0000000..247978c
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportAcceptListener.java
@@ -0,0 +1,31 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+
+import org.apache.aries.rsa.provider.fastbin.tcp.TcpTransport;
+
+/**
+ * Implemented by object that need to get injected by
+ *
+ */
+public interface TransportAcceptListener {
+    
+    void onAccept(TransportServer transportServer, TcpTransport transport);
+    
+    void onAcceptError(TransportServer transportServer, Exception error);
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java
new file mode 100644
index 0000000..4b16bde
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportListener.java
@@ -0,0 +1,55 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+import java.io.IOException;
+
+
+/**
+ * An asynchronous listener of commands
+ *
+ */
+public interface TransportListener {
+    
+    /**
+     * called to process a command
+     * @param command
+     */
+    void onTransportCommand(Transport transport, Object command);
+
+    /**
+     * transport can now accept more commands for transmission. 
+     */
+    void onRefill(Transport transport);
+
+    /**
+     * An unrecoverable exception has occured on the transport
+     * @param error
+     */
+    void onTransportFailure(Transport transport, IOException error);
+    
+    /**
+     * The transport has been connected.
+     */
+    public void onTransportConnected(Transport transport);
+
+    /**
+     * The transport has suffered a disconnection from
+     * which it hopes to recover
+     */
+    public void onTransportDisconnected(Transport transport);
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java
new file mode 100644
index 0000000..e8f620e
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/TransportServer.java
@@ -0,0 +1,73 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.io;
+
+import java.net.InetSocketAddress;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+
+/**
+ * A TransportServer asynchronously accepts {@see Transport} objects and then
+ * delivers those objects to a {@see TransportAcceptListener}.
+ * 
+ * @version $Revision: 1.4 $
+ */
+public interface TransportServer extends Service {
+
+    /**
+     * Registers an {@see TransportAcceptListener} which is notified of accepted
+     * channels.
+     * 
+     * @param acceptListener
+     */
+    void setAcceptListener(TransportAcceptListener acceptListener);
+
+    String getBoundAddress();
+
+    String getConnectAddress();
+
+    /**
+     * @return The socket address that this transport is accepting connections
+     *         on or null if this does not or is not currently accepting
+     *         connections on a socket.
+     */
+    InetSocketAddress getSocketAddress();
+
+    /**
+     * Returns the dispatch queue used by the transport
+     *
+     * @return
+     */
+    DispatchQueue getDispatchQueue();
+
+    /**
+     * Sets the dispatch queue used by the transport
+     *
+     * @param queue
+     */
+    void setDispatchQueue(DispatchQueue queue);
+
+    /**
+     * suspend accepting new transports
+     */
+    void suspend();
+
+    /**
+     * resume accepting new transports
+     */
+    void resume();
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
new file mode 100644
index 0000000..33b4b23
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
@@ -0,0 +1,188 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.tcp;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.rmi.RemoteException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public class AsyncInvocationStrategy implements InvocationStrategy {
+
+    public static final AsyncInvocationStrategy INSTANCE = new AsyncInvocationStrategy();
+
+    static public boolean isAsyncMethod(Method method) {
+        Class<?>[] types = method.getParameterTypes();
+        return types.length != 0 && types[types.length - 1] == AsyncCallback.class;
+    }
+
+
+    private class AsyncResponseFuture implements ResponseFuture {
+
+        private final ClassLoader loader;
+        private final Method method;
+        private final AsyncCallback callback;
+        private final SerializationStrategy serializationStrategy;
+        private final DispatchQueue queue;
+
+        public AsyncResponseFuture(ClassLoader loader, Method method, AsyncCallback callback, SerializationStrategy serializationStrategy, DispatchQueue queue) {
+            this.loader = loader;
+            this.method = method;
+            this.callback = callback;
+            this.serializationStrategy = serializationStrategy;
+            this.queue = queue;
+        }
+
+        public void set(final DataByteArrayInputStream source) {
+            if( queue!=null ) {
+                queue.execute(new Runnable() {
+                    public void run() {
+                        decodeIt(source);
+                    }
+                });
+            } else {
+                decodeIt(source);
+            }
+        }
+
+        private void decodeIt(DataByteArrayInputStream source) {
+            try {
+                serializationStrategy.decodeResponse(loader, getResultType(method), source, callback);
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            // TODO: we could store the timeout so we can time out the async request...
+            return null;
+        }
+
+        @Override
+        public void fail(Throwable throwable) {
+            callback.onFailure(throwable);
+        }
+    }
+
+    public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream target) throws Exception {
+        if(!isAsyncMethod(method)) {
+            throw new IllegalArgumentException("Invalid async method declaration: last argument is not a RequestCallback");
+        }
+
+        Class[] new_types = payloadTypes(method);
+        Object[] new_args = new Object[args.length-1];
+        System.arraycopy(args, 0, new_args, 0, new_args.length);
+
+        serializationStrategy.encodeRequest(loader, new_types, new_args, target);
+
+        return new AsyncResponseFuture(loader, method, (AsyncCallback) args[args.length-1], serializationStrategy, Dispatch.getCurrentQueue());
+    }
+
+    static private Class<?>[] payloadTypes(Method method) {
+        Class<?>[] types = method.getParameterTypes();
+        Class<?>[] new_types = new Class<?>[types.length-1];
+        System.arraycopy(types, 0, new_types, 0, new_types.length);
+        return new_types;
+    }
+
+    static private Class getResultType(Method method) {
+        Type[] types = method.getGenericParameterTypes();
+        ParameterizedType t = (ParameterizedType) types[types.length-1];
+        return (Class) t.getActualTypeArguments()[0];
+    }
+
+
+    class ServiceResponse {
+
+        private final ClassLoader loader;
+        private final Method method;
+        private final DataByteArrayOutputStream responseStream;
+        private final Runnable onComplete;
+        private final SerializationStrategy serializationStrategy;
+        private final int pos;
+        // Used to protect against sending multiple responses.
+        final AtomicBoolean responded = new AtomicBoolean(false);
+
+        public ServiceResponse(ClassLoader loader, Method method, DataByteArrayOutputStream responseStream, Runnable onComplete, SerializationStrategy serializationStrategy) {
+            this.loader = loader;
+            this.method = method;
+            this.responseStream = responseStream;
+            this.onComplete = onComplete;
+            this.serializationStrategy = serializationStrategy;
+            pos = responseStream.position();
+        }
+
+        public void send(Throwable error, Object value) {
+            if( responded.compareAndSet(false, true) ) {
+                Class resultType = getResultType(method);
+                try {
+                    serializationStrategy.encodeResponse(loader, resultType, value, error, responseStream);
+                } catch (Exception e) {
+                    // we failed to encode the response.. reposition and write that error.
+                    try {
+                        responseStream.position(pos);
+                        serializationStrategy.encodeResponse(loader, resultType, value, new RemoteException(e.toString()), responseStream);
+                    } catch (Exception unexpected) {
+                        unexpected.printStackTrace();
+                    }
+                } finally {
+                    onComplete.run();
+                }
+            }
+        }
+
+
+    }
+    public void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, final DataByteArrayOutputStream responseStream, final Runnable onComplete) {
+
+        final ServiceResponse helper = new ServiceResponse(loader, method, responseStream, onComplete, serializationStrategy);
+        try {
+
+            Object[] new_args = new Object[method.getParameterTypes().length];
+            serializationStrategy.decodeRequest(loader, payloadTypes(method), requestStream, new_args);
+            new_args[new_args.length-1] = new AsyncCallback<Object>() {
+                public void onSuccess(Object result) {
+                    helper.send(null, result);
+                }
+                public void onFailure(Throwable failure) {
+                    helper.send(failure, null);
+                }
+            };
+            method.invoke(target, new_args);
+
+        } catch (Throwable t) {
+            helper.send(t, null);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/62d835de/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
new file mode 100644
index 0000000..d695e4f
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
@@ -0,0 +1,126 @@
+/**
+ *  Copyright 2005-2015 Red Hat, Inc.
+ *
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.tcp;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.rmi.RemoteException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtdispatch.Dispatch;
+
+/**
+ * <p>
+ * </p>
+ *
+ */
+public class BlockingInvocationStrategy implements InvocationStrategy {
+
+    public static final BlockingInvocationStrategy INSTANCE = new BlockingInvocationStrategy();
+
+    private static final Callable<Object> EMPTY_CALLABLE = new Callable<Object>() {
+        public Object call() {
+            return null;
+        }
+    };
+
+    private class BlockingResponseFuture extends FutureTask<Object> implements ResponseFuture, AsyncCallback {
+
+        private final ClassLoader loader;
+        private final Method method;
+        private final SerializationStrategy serializationStrategy;
+
+        public BlockingResponseFuture(ClassLoader loader, Method method, SerializationStrategy serializationStrategy) {
+            super(EMPTY_CALLABLE);
+            this.loader = loader;
+            this.method = method;
+            this.serializationStrategy = serializationStrategy;
+        }
+
+        public void set(DataByteArrayInputStream source) throws IOException, ClassNotFoundException {
+            try {
+                serializationStrategy.decodeResponse(loader, method.getReturnType(), source, this);
+            } catch (Throwable e) {
+                super.setException(e);
+            }
+        }
+
+        public void fail(Throwable failure) {
+            super.setException(failure);
+        }
+
+        public void onSuccess(Object result) {
+            super.set(result);
+        }
+
+        public void onFailure(Throwable failure) {
+            super.setException(failure);
+        }
+    }
+
+    public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream target) throws Exception {
+
+        assert Dispatch.getCurrentQueue() == null : "You should not do blocking RPC class when executing on a dispatch queue";
+
+        serializationStrategy.encodeRequest(loader, method.getParameterTypes(), args, target);
+        return new BlockingResponseFuture(loader, method, serializationStrategy);
+    }
+
+    public void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) {
+
+        int pos = responseStream.position();
+        try {
+
+            Object value = null;
+            Throwable error = null;
+
+            try {
+                Class<?>[] types = method.getParameterTypes();
+                final Object[] args = new Object[types.length];
+                serializationStrategy.decodeRequest(loader, types, requestStream, args);
+                value = method.invoke(target, args);
+            } catch (Throwable t) {
+                if (t instanceof InvocationTargetException) {
+                    error = t.getCause();
+                } else {
+                    error = t;
+                }
+            }
+
+            serializationStrategy.encodeResponse(loader, method.getReturnType(), value, error, responseStream);
+
+        } catch(Exception e) {
+
+            // we failed to encode the response.. reposition and write that error.
+            try {
+                responseStream.position(pos);
+                serializationStrategy.encodeResponse(loader, method.getReturnType(), null, new RemoteException(e.toString()), responseStream);
+            } catch (Exception unexpected) {
+                unexpected.printStackTrace();
+            }
+
+        } finally {
+            onComplete.run();
+        }
+    }
+
+}


Mime
View raw message