ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [3/6] git commit: Refactor to a Message based Continuation
Date Tue, 13 Aug 2013 14:00:06 GMT
Refactor to a Message based Continuation


Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/54b19738
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/54b19738
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/54b19738

Branch: refs/heads/master
Commit: 54b1973893baca72b38d10c760f22a4031b14e9c
Parents: 77ea811
Author: Hadrian Zbarcea <hzbarcea@gmail.com>
Authored: Fri Aug 2 16:59:35 2013 -0400
Committer: Hadrian Zbarcea <hzbarcea@gmail.com>
Committed: Fri Aug 2 16:59:35 2013 -0400

----------------------------------------------------------------------
 .../java/org/apache/ode/jacob/oo/Channel.java   | 14 +++-
 .../apache/ode/jacob/oo/ChannelChannelRef.java  | 39 +++++++++
 .../java/org/apache/ode/jacob/oo/ClassUtil.java | 86 +++++++++++++++++++-
 .../ode/jacob/oo/JacobObjectChannelRef.java     | 40 +++++++++
 .../org/apache/ode/jacob/soup/Continuation.java | 65 ++++++++-------
 .../soup/jackson/ContinuationSerializer.java    |  3 +-
 .../ode/jacob/vpu/ExecutionQueueImpl.java       |  3 +-
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java |  4 +-
 .../apache/ode/jacob/oo/JacobOOMappingTest.java | 69 ++++++++++++++++
 9 files changed, 283 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/Channel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/Channel.java b/src/main/java/org/apache/ode/jacob/oo/Channel.java
index ac351e0..a59c9ec 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Channel.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Channel.java
@@ -21,8 +21,18 @@ package org.apache.ode.jacob.oo;
 import java.io.Serializable;
 
 /**
- * Marker interface extended by channel interfaces.
- * TODO: evaluate if an annotation, although slower, would offer a more elegant approach
+ *  Marker interface extended by channel interfaces.
+ *  TODO: Document how the Channel interface could be extended.
+ *   - a concrete Channel should be defined as an interface
+ *   - one should not overload Method names
+ *   - a concrete Channel could be further extended, but 
+ *     then it must not redeclare a parent Channel method
+ *   etc...
+ *   
+ *   Correctness of Channel definition should be established via
+ *   static analysis and will be assumed at runtime, avoiding
+ *   unnecessary checks for situation like deep inheritance 
+ *   hierarchies.
  */
 public interface Channel extends Serializable {
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java b/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java
new file mode 100644
index 0000000..72b2bb8
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+import org.apache.ode.jacob.ChannelRef;
+
+
+/**
+ * TODO: Document...
+ */
+
+public class ChannelChannelRef implements ChannelRef {
+	private final Channel ref;
+	
+	public ChannelChannelRef(Channel channel) {
+		ref = channel;
+	}
+
+	public Channel getRef() {
+		return ref;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
index 15c8c4b..b16e43b 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
@@ -22,18 +22,25 @@ import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.ode.jacob.ChannelRef;
 import org.apache.ode.jacob.Expression;
+import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public final class ClassUtil {
     public static final Method RUN_METHOD;
+    public static final String RUN_METHOD_NAME = "run";
+    public static final String RUN_METHOD_ACTION = "java.lang.Runnable#run";
     private static final Set<Method> RUN_METHOD_SET = new HashSet<Method>();
+    private static final Logger LOG = LoggerFactory.getLogger(ClassUtil.class);
 
     static {
         try {
             // Resolve the {@link Runnable#run} method once statically
-            RUN_METHOD = Runnable.class.getMethod("run", new Class[]{});
+            RUN_METHOD = Runnable.class.getMethod(RUN_METHOD_NAME, new Class[]{});
             RUN_METHOD_SET.add(RUN_METHOD);
         } catch (Exception e) {
             throw new Error("Cannot resolve 'run()' method", e);
@@ -48,9 +55,60 @@ public final class ClassUtil {
     	return RUN_METHOD_SET;
     }
 
-    public static String getMessageType(Method channelMethod) {
+    public static Message createMessage(JacobObject target, Method method, Object[] args)
{
+    	Message message = new Message();
+        message.setTo(new JacobObjectChannelRef(target));
+        message.setAction(ClassUtil.getActionForMethod(method));
+        message.setBody(args);
+        return message;
+    }
+    public static String getActionForMethod(Method channelMethod) {
+    	if (channelMethod == null) {
+    		return null;
+    	}
     	MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class);
-    	return handler == null ? channelMethod.getClass().getName() + "." + channelMethod.getName()
: handler.value();
+    	if (handler != null) {
+    		return handler.value();
+    	}
+    	Class<?> clazz = channelMethod.getDeclaringClass();
+    	if (Runnable.class.isAssignableFrom(clazz) 
+    		&& channelMethod.getName() == "run"
+    		&& channelMethod.getParameterTypes().length == 0) {
+    		return RUN_METHOD_ACTION;
+    	}
+    	if (!Channel.class.isAssignableFrom(clazz)) {
+    		LOG.trace("Action '{}' can only be defined for a Channel extension", channelMethod.getName());
+    		return null;
+    	}
+		LOG.trace("Probing Channel class '{}' for declaration of method '{}'", clazz.getName(),
channelMethod.getName());
+		for (Class<?> c : clazz.getInterfaces()) {
+			String action = getChannelMethodAction(c, channelMethod);
+			if (action != null) {
+				return action;
+			}
+		}
+		// ... if clazz is a Channel interface itself 
+    	return getChannelMethodAction(clazz, channelMethod);
+    }
+
+    /**
+     * @param clazz
+     * @param method
+     * @return
+     * 
+     * The default action associated with a Channel method is defined as Class#method
+     * following a convention similar to javadoc.
+     */
+    private static String getChannelMethodAction(Class<?> clazz, Method method) {
+		if (Channel.class.isAssignableFrom(clazz)) {
+			try {
+				Method m = clazz.getMethod(method.getName(), method.getParameterTypes());
+    	    	return m.getDeclaringClass().getName() + "#" + m.getName();
+			} catch (SecurityException e) {		// ignore
+			} catch (NoSuchMethodException e) {	// ignore
+			}
+		}
+		return null;
     }
 
     public static Expression findActionMethod(final Set<Method> implementedMethods)
{
@@ -60,7 +118,7 @@ public final class ClassUtil {
 				String action = message.getAction();
 				if (Method.class.equals(type) && action != null) {
 					for (Method m : implementedMethods) {
-						if (action.equals(ClassUtil.getMessageType(m))) {
+						if (action.equals(ClassUtil.getActionForMethod(m))) {
 							return (T)m;
 						}
 					}
@@ -69,6 +127,26 @@ public final class ClassUtil {
 			}
     	};
     }
+
+    public static Expression target() {
+    	return new Expression() {
+			@SuppressWarnings("unchecked")
+			public <T> T evaluate(Message message, Class<T> type) {
+				ChannelRef ref = message.getTo();
+				if (JacobObject.class.equals(type) && ref instanceof JacobObjectChannelRef) {
+					return (T)((JacobObjectChannelRef)ref).getRef();
+				} else if (Channel.class.equals(type) && ref instanceof ChannelChannelRef) {
+					return (T)((ChannelChannelRef)ref).getRef();
+				}
+				return null;
+			}
+    	};
+    }
+
+    public static JacobObject getMessageClosure(Message message) {
+    	return target().evaluate(message, JacobObject.class);
+    }
+
     public static Set<Method> getImplementedMethods(Set<Method> methods, Class<?>
clazz) {
         // TODO: this can be optimized (some 20 times faster in my tests) by keeping a private

         //  map of interfaces to methods: Map<Class<?>, Method[]> and just do
lookups

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java b/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java
new file mode 100644
index 0000000..a7b7ee7
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+import org.apache.ode.jacob.ChannelRef;
+import org.apache.ode.jacob.JacobObject;
+
+
+/**
+ * TODO: Document...
+ */
+
+public class JacobObjectChannelRef implements ChannelRef {
+	private final JacobObject ref;
+	
+	public JacobObjectChannelRef(JacobObject channel) {
+		ref = channel;
+	}
+
+	public JacobObject getRef() {
+		return ref;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/soup/Continuation.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/Continuation.java b/src/main/java/org/apache/ode/jacob/soup/Continuation.java
index 0f4286f..92fa7a6 100644
--- a/src/main/java/org/apache/ode/jacob/soup/Continuation.java
+++ b/src/main/java/org/apache/ode/jacob/soup/Continuation.java
@@ -19,6 +19,8 @@
 package org.apache.ode.jacob.soup;
 
 import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.oo.ClassUtil;
 
 import java.lang.reflect.Method;
 
@@ -29,35 +31,38 @@ import java.lang.reflect.Method;
  * @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
  */
 public class Continuation extends ExecutionQueueObject {
-  private JacobObject _closure;
-  private Method _method;
-  private Object[] _args;
-
-  public Continuation(JacobObject target, Method method, Object[] args) {
-    _closure = target;
-    _method = method;
-    _args = args;
-  }
-
-  public JacobObject getClosure() {
-    return _closure;
-  }
-
-  public Method getMethod() {
-    return _method;
-  }
-
-  public Object[] getArgs() {
-    return _args;
-  }
-
-  public String toString () {
-    return new StringBuilder("{")
-        .append(this.getClass().getSimpleName())
-        .append(" closure=").append(_closure)
-        .append(", method=").append(_method.getName())
-        .append(", args=").append(_args)
-        .append("}").toString();
-  }
+    private final Message message;
+    private JacobObject _closure;
+    private Method _method;
+    private Object[] _args;
+
+    public Continuation(JacobObject target, Method method, Object[] args) {
+        _closure = target;
+        _method = method;
+        _args = args;
+        
+        message = ClassUtil.createMessage(target, method, args);
+    }
+
+    public Method getMethod() {
+        return _method;
+    }
+
+    public Object[] getArgs() {
+        return _args;
+    }
+
+    public Message getMessage() {
+	    return message;
+    }
+
+    public String toString () {
+        return new StringBuilder("{")
+            .append(this.getClass().getSimpleName())
+            .append(" closure=").append(_closure)
+            .append(", method=").append(_method.getName())
+            .append(", args=").append(_args)
+            .append("}").toString();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java
index 80aaa9b..9a8c6ae 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java
@@ -20,6 +20,7 @@ package org.apache.ode.jacob.soup.jackson;
 
 import java.io.IOException;
 
+import org.apache.ode.jacob.oo.ClassUtil;
 import org.apache.ode.jacob.soup.Continuation;
 
 import com.fasterxml.jackson.core.JsonGenerationException;
@@ -63,7 +64,7 @@ public class ContinuationSerializer extends StdSerializer<Continuation>
{
     private void serializeContents(Continuation value, JsonGenerator jgen,
             SerializerProvider provider) throws JsonGenerationException, IOException {
 
-        jgen.writeObjectField("target", value.getClosure());
+        jgen.writeObjectField("target", ClassUtil.getMessageClosure(value.getMessage()));
         jgen.writeStringField("method", value.getMethod().getName());
         jgen.writeObjectField("args", value.getArgs());
     }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index c9b24f5..9fc1085 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -45,6 +45,7 @@ import org.apache.ode.jacob.IndexedObject;
 import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
+import org.apache.ode.jacob.oo.ClassUtil;
 import org.apache.ode.jacob.soup.Comm;
 import org.apache.ode.jacob.soup.CommChannel;
 import org.apache.ode.jacob.soup.CommGroup;
@@ -293,7 +294,7 @@ public class ExecutionQueueImpl implements ExecutionQueue {
         // Write out the reactions.
         sos.writeInt(_reactions.size());
         for (Continuation c : _reactions) {
-            sos.writeObject(c.getClosure());
+            sos.writeObject(ClassUtil.getMessageClosure(c.getMessage()));
             sos.writeUTF(c.getMethod().getName());
             sos.writeInt(c.getArgs() == null ? 0 : c.getArgs().length);
             for (int j = 0; c.getArgs() != null && j < c.getArgs().length; ++j)

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 491fb4b..bad5aa3 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -248,7 +248,7 @@ public final class JacobVPU {
         JacobThreadImpl(Continuation rqe) {
             assert rqe != null;
 
-            _methodBody = rqe.getClosure();
+            _methodBody = ClassUtil.getMessageClosure(rqe.getMessage());
             _args = rqe.getArgs();
             _source = rqe.getDescription();
             _method = rqe.getMethod();
@@ -427,7 +427,7 @@ public final class JacobVPU {
             long ctime = System.currentTimeMillis();
             try {
             	if (_methodBody instanceof ReceiveProcess) {
-            		Message msg = new Message(null, null, ClassUtil.getMessageType(_method));
+            		Message msg = new Message(null, null, ClassUtil.getActionForMethod(_method));
             		msg.setBody(args);
 
             		((ReceiveProcess)_methodBody).onMessage(msg);

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java b/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java
new file mode 100644
index 0000000..daf056e
--- /dev/null
+++ b/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+
+import java.lang.reflect.Method;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class JacobOOMappingTest {
+
+	@Test
+	public void testProcessAction() {
+		String action = ClassUtil.getActionForMethod(methodOf(TestProcess.class, "run"));
+		Assert.assertEquals("java.lang.Runnable#run", action);
+	}
+
+
+	@Test
+	public void testDefaultChannelAction() {
+		String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "one"));
+		Assert.assertEquals("org.apache.ode.jacob.oo.JacobOOMappingTest$TestChannel#one", action);
+	}
+
+	@Test
+	public void testCustomChannelAction() {
+		String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "two"));
+		Assert.assertEquals("TestChannel-custom", action);
+	}
+
+	private final Method methodOf(Class<?> clazz, String name) {
+        try {
+            return clazz.getMethod(name, new Class[]{});
+        } catch (Exception e) {
+        	// ignore
+        }
+        return null;
+	}
+
+	public static interface TestChannel extends Channel {
+		void one();
+		@MessageHandler("TestChannel-custom") void two();
+	}
+
+	public static class TestProcess implements Runnable {
+		public void run() {
+			// do nothing
+		}
+	}
+
+}


Mime
View raw message