aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jut...@apache.org
Subject aries-rsa git commit: [ARIES-1632] fastbin does not throw an error for unknown services
Date Thu, 17 Nov 2016 10:33:09 GMT
Repository: aries-rsa
Updated Branches:
  refs/heads/master 2792d5d64 -> b87b66362


[ARIES-1632] fastbin does not throw an error for unknown services

when no service holder is available for a given ID, it falls back to
object serialization strategy and sends back a service exception


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/b87b6636
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/b87b6636
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/b87b6636

Branch: refs/heads/master
Commit: b87b6636216a353419732f19e0ad146e751098df
Parents: 2792d5d
Author: Johannes Utzig <jutzig@apache.org>
Authored: Thu Nov 17 11:08:30 2016 +0100
Committer: Johannes Utzig <jutzig@apache.org>
Committed: Thu Nov 17 11:32:24 2016 +0100

----------------------------------------------------------------------
 .../fastbin/tcp/AbstractInvocationStrategy.java |  29 +++++
 .../fastbin/tcp/InvocationStrategy.java         |  13 +++
 .../provider/fastbin/tcp/ServerInvokerImpl.java | 105 +++++++++++++------
 .../rsa/provider/fastbin/InvocationTest.java    |  77 ++++++++++++++
 4 files changed, 193 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
index 2e9937a..c2ab04f 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
@@ -103,10 +103,39 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy
 
     @Override
     public final void service(SerializationStrategy serializationStrategy, ClassLoader loader,
Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream
responseStream, Runnable onComplete) {
+        if(method==null && target instanceof ServiceException) {
+            handleInvalidRequest(serializationStrategy, loader, method, target, responseStream,
onComplete);
+            return;
+        }
         doService(serializationStrategy, loader, method, target, requestStream, responseStream,
onComplete);
 
     }
 
+    protected void handleInvalidRequest(SerializationStrategy serializationStrategy, ClassLoader
loader, Method method, Object target, DataByteArrayOutputStream responseStream, Runnable onComplete)
{
+        //client made an invalid request
+        int pos = responseStream.position();
+        try {
+
+            Object value = null;
+            Throwable error = (Throwable)target;
+            serializationStrategy.encodeResponse(loader, null, value, error, responseStream);
+
+        } catch(Exception e) {
+
+            LOGGER.warn("Initial Encoding response for method "+method+" failed. Retrying",e);
+            // we failed to encode the response.. reposition and write that error.
+            try {
+                responseStream.position(pos);
+                serializationStrategy.encodeResponse(loader, null, null, new ServiceException(e.toString()),
responseStream);
+            } catch (Exception unexpected) {
+                LOGGER.error("Error while servicing "+method,unexpected);
+            }
+
+        } finally {
+            onComplete.run();
+        }
+    }
+
     /**
      * performs the actual remote call using the provided parameters
      * @param serializationStrategy the strategy to serialize the objects with

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java
index d05ad88..024bc53 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java
@@ -33,5 +33,18 @@ public interface InvocationStrategy {
 
     public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader
loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception;
 
+    /**
+     * handles the actual remote call.
+     * <p>
+     * if method is <code>null</code> and target is a <code>ServiceException</code>
it is treated as an indication that the method lookup failed.
+     * In such a case the strategy will send the service exception to the caller
+     * @param serializationStrategy
+     * @param loader
+     * @param method
+     * @param target
+     * @param requestStream
+     * @param responseStream
+     * @param onComplete
+     */
     void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method
method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream,
Runnable onComplete);
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
index d56d64c..1dd58f9 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
@@ -46,6 +46,7 @@ import org.fusesource.hawtbuf.DataByteArrayInputStream;
 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.hawtdispatch.DispatchQueue;
+import org.osgi.framework.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -243,41 +244,83 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched
{
             final Buffer encoded_method = readBuffer(bais);
 
             final ServiceFactoryHolder holder = holders.get(service);
-            final MethodData methodData = holder.getMethodData(encoded_method);
-
-            final Object svc = holder.factory.get();
+            Runnable task = null;
+            if(holder==null) {
+                LOGGER.warn("The requested service {"+service+"} is not available");
+                task = new Runnable() {
+                    public void run() {
+
+                        final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                        try {
+                            baos.writeInt(0); // make space for the size field.
+                            baos.writeVarLong(correlation);
+                        } catch (IOException e) { // should not happen
+                            LOGGER.error("Failed to write to buffer",e);
+                            throw new RuntimeException(e);
+                        }
 
-            Runnable task = new Runnable() {
-                public void run() {
+                        // Lets decode the remaining args on the target's executor
+                        // to take cpu load off the
+                        BlockingInvocationStrategy strategy = new BlockingInvocationStrategy();
+                        strategy.service(ObjectSerializationStrategy.INSTANCE, getClass().getClassLoader(),
null,  new ServiceException("The requested service {"+service+"} is not available"), bais,
baos, new Runnable() {
+
+                            public void run() {
+                                final Buffer command = baos.toBuffer();
+
+                                // Update the size field.
+                                BufferEditor editor = command.buffer().bigEndianEditor();
+                                editor.writeInt(command.length);
+
+                                queue().execute(new Runnable() {
+                                    public void run() {
+                                        transport.offer(command);
+                                    }
+                                });
+                            }
+                        });
+                    }
+                };
+            }
+            final Object svc = holder==null ? null : holder.factory.get();
+            if(holder!=null)
+            {
+                final MethodData methodData = holder.getMethodData(encoded_method);
+
+
+                task = new Runnable() {
+                    public void run() {
+
+                        final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+                        try {
+                            baos.writeInt(0); // make space for the size field.
+                            baos.writeVarLong(correlation);
+                        } catch (IOException e) { // should not happen
+                            LOGGER.error("Failed to write to buffer",e);
+                            throw new RuntimeException(e);
+                        }
 
-                    final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-                    try {
-                        baos.writeInt(0); // make space for the size field.
-                        baos.writeVarLong(correlation);
-                    } catch (IOException e) { // should not happen
-                        throw new RuntimeException(e);
+                        // Lets decode the remaining args on the target's executor
+                        // to take cpu load off the
+                        methodData.invocationStrategy.service(methodData.serializationStrategy,
holder.loader, methodData.method, svc, bais, baos, new Runnable() {
+                            public void run() {
+                                holder.factory.unget();
+                                final Buffer command = baos.toBuffer();
+
+                                // Update the size field.
+                                BufferEditor editor = command.buffer().bigEndianEditor();
+                                editor.writeInt(command.length);
+
+                                queue().execute(new Runnable() {
+                                    public void run() {
+                                        transport.offer(command);
+                                    }
+                                });
+                            }
+                        });
                     }
+                };
 
-                    // Lets decode the remaining args on the target's executor
-                    // to take cpu load off the
-                    methodData.invocationStrategy.service(methodData.serializationStrategy,
holder.loader, methodData.method, svc, bais, baos, new Runnable() {
-                        public void run() {
-                            holder.factory.unget();
-                            final Buffer command = baos.toBuffer();
-
-                            // Update the size field.
-                            BufferEditor editor = command.buffer().bigEndianEditor();
-                            editor.writeInt(command.length);
-
-                            queue().execute(new Runnable() {
-                                public void run() {
-                                    transport.offer(command);
-                                }
-                            });
-                        }
-                    });
-                }
-            };
+            }
 
             Executor executor;
             if( svc instanceof Dispatched ) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java
b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java
index 7725375..647d8b6 100644
--- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java
@@ -28,6 +28,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -45,6 +48,7 @@ import org.apache.aries.rsa.provider.fastbin.test.StringValue;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
 import org.junit.Test;
+import org.osgi.framework.ServiceException;
 
 public class InvocationTest {
     final static long MILLIS_IN_A_NANO = TimeUnit.MILLISECONDS.toNanos(1);
@@ -109,6 +113,57 @@ public class InvocationTest {
         }
     }
 
+    /**
+     * tests that requests to an unknown ID throw an exception instead of deadlocking the
request
+     * @throws Exception
+     */
+    @Test(timeout=30*1000)
+    public void testInvokeInvalidServiceID() throws Exception {
+
+        DispatchQueue queue = Dispatch.createQueue();
+        HashMap<String, SerializationStrategy> map = new HashMap<String, SerializationStrategy>();
+        map.put("protobuf", new ProtobufSerializationStrategy());
+
+        ServerInvokerImpl server = new ServerInvokerImpl("tcp://localhost:0", queue, map);
+        server.start();
+
+        ClientInvokerImpl client = new ClientInvokerImpl(queue, map);
+        client.start();
+
+        try {
+            server.registerService("service-id", new ServerInvoker.ServiceFactory() {
+                public Object get() {
+                    return new Hello2Impl();
+                }
+                public void unget() {
+                }
+            }, Hello2Impl.class.getClassLoader());
+
+
+            InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id-broken",
HelloImpl.class.getClassLoader());
+            Hello2 hello  = (Hello2) Proxy.newProxyInstance(Hello2Impl.class.getClassLoader(),
new Class[] { Hello2.class }, handler);
+
+            try{
+                hello.hello("Fabric");
+                fail("The service id does not exist, so this must fail");
+            } catch (ServiceException e) {
+                assertNotNull(e.getMessage());
+            }
+            try{
+                hello.helloAsync("World").get();
+                fail("The service id does not exist, so this must fail");
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof ServiceException);
+                assertNotNull(e.getCause().getMessage());
+            }
+        }
+        finally {
+            server.stop();
+            client.stop();
+        }
+    }
+
+
 
     @Test
     public void testObjectMethods() throws Exception {
@@ -619,5 +674,27 @@ public class InvocationTest {
         }
     }
 
+    public interface Hello2 {
+
+        public String hello(String name);
+
+        public Future<String> helloAsync(String name);
+    }
+
+    public class Hello2Impl implements Hello2 {
+
+        @Override
+        public String hello(String name) {
+            return "Hello "+name;
+        }
+
+        @Override
+        public Future<String> helloAsync(final String name) {
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            return executor.submit(() -> {
+                return hello(name);
+            });
+        }
+    }
 
 }


Mime
View raw message