Updated Branches:
refs/heads/master 1559bfe07 -> ba539a12b
Use messages instead of reflection for passing values accross Channel(s)
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/a8339de5
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/a8339de5
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/a8339de5
Branch: refs/heads/master
Commit: a8339de5ea820e38430e8d32935a7b0db5777cba
Parents: 1559bfe
Author: Hadrian Zbarcea <hzbarcea@gmail.com>
Authored: Tue Jul 30 13:56:15 2013 -0400
Committer: Hadrian Zbarcea <hzbarcea@gmail.com>
Committed: Tue Jul 30 13:56:15 2013 -0400
----------------------------------------------------------------------
src/main/java/org/apache/ode/jacob/Message.java | 74 ++++++++++++++++++++
.../org/apache/ode/jacob/MessageChannel.java | 28 ++++++++
.../java/org/apache/ode/jacob/MessageType.java | 27 +++++++
.../apache/ode/jacob/oo/ChannelListener.java | 29 +++++++-
.../java/org/apache/ode/jacob/oo/ClassUtil.java | 6 ++
.../org/apache/ode/jacob/oo/MessageHandler.java | 42 +++++++++++
.../java/org/apache/ode/jacob/oo/Synch.java | 6 +-
src/main/java/org/apache/ode/jacob/oo/Val.java | 5 +-
.../java/org/apache/ode/jacob/vpu/JacobVPU.java | 17 ++++-
.../apache/ode/jacob/examples/cell/Cell.java | 8 ++-
.../eratosthenes/NaturalNumberStream.java | 5 +-
.../jacob/examples/helloworld/HelloWorld.java | 7 +-
.../ode/jacob/examples/synch/SynchPrint.java | 5 +-
.../jacob/vpu/ProxyConstructorTimingTest.java | 5 +-
14 files changed, 251 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/Message.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java
new file mode 100644
index 0000000..0eb9024
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/Message.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * TODO: Document...
+ * TODO: should anything be final here? the class itself?
+ */
+
+public final class Message {
+ private Class<? extends MessageType> type;
+ private MessageChannel reply;
+ private Map<String, Object> headers;
+ private Object body;
+
+ public Message() {
+ // TODO: do we always need headers?
+ headers = new ConcurrentHashMap<String, Object>();
+ }
+ public Message(Class<? extends MessageType> type) {
+ this();
+ this.type = type;
+ }
+
+ public Class<? extends MessageType> getType() {
+ return type;
+ }
+ public void setType(Class<? extends MessageType> type) {
+ this.type = type;
+ }
+ public MessageChannel getReply() {
+ return reply;
+ }
+ public void setReply(MessageChannel reply) {
+ this.reply = reply;
+ }
+ public Map<String, Object> getHeaders() {
+ return headers;
+ }
+ public void setHeaders(Map<String, Object> headers) {
+ this.headers = headers;
+ }
+ public Object getBody() {
+ return body;
+ }
+ public void setBody(Object body) {
+ this.body = body;
+ }
+
+ public boolean containsHeader(String header) {
+ return headers.containsKey(header);
+ }
+ // TODO: add any other convenience methods like addHeader, removeHeader?
+}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/MessageChannel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/MessageChannel.java b/src/main/java/org/apache/ode/jacob/MessageChannel.java
new file mode 100644
index 0000000..bf7a80d
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/MessageChannel.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+
+/**
+ * TODO: Document...
+ */
+
+public interface MessageChannel {
+ void onMessage(Message msg);
+}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/MessageType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/MessageType.java b/src/main/java/org/apache/ode/jacob/MessageType.java
new file mode 100644
index 0000000..a1e4fe5
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/MessageType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+
+/**
+ * TODO: Document...
+ */
+
+public interface MessageType {
+}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
index a8df5f9..0c607c2 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
@@ -19,7 +19,14 @@
package org.apache.ode.jacob.oo;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Set;
+
import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.MessageChannel;
+import org.apache.ode.jacob.MessageType;
/**
@@ -27,5 +34,25 @@ import org.apache.ode.jacob.JacobObject;
* class <em>and</em> implement one <code>Channel</code> interface.
*/
@SuppressWarnings("serial")
-public abstract class ChannelListener extends JacobObject {
+public abstract class ChannelListener extends JacobObject implements MessageChannel {
+
+ public void onMessage(Message msg) {
+ Class<? extends MessageType> type = msg.getType();
+
+ Set<Method> methods = this.getImplementedMethods();
+ for (Method m : methods) {
+ if (type != null && type.equals(ClassUtil.getMessageType(m))) {
+ if (this instanceof ReceiveProcess) {
+ try {
+ m.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody());
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ break;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
index a7bbd03..4e5c59c 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
+import org.apache.ode.jacob.MessageType;
+
public final class ClassUtil {
public static final Method RUN_METHOD;
@@ -41,6 +43,10 @@ public final class ClassUtil {
// Utility class
}
+ public static Class<? extends MessageType> getMessageType(Method channelMethod)
{
+ MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class);
+ return handler == null ? null : handler.value();
+ }
public static Set<Method> runMethodSet() {
return RUN_METHOD_SET;
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
new file mode 100644
index 0000000..b2bed2d
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.ode.jacob.MessageType;
+
+
+/**
+ * Marks a {@link Channel} method as handling a {@link Message}
+ * of a certaing {@link MessageType}
+ *
+ * @see Message#getType()
+ */
+
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target({ElementType.METHOD})
+public @interface MessageHandler {
+ Class<? extends MessageType> value();
+}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/Synch.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/Synch.java b/src/main/java/org/apache/ode/jacob/oo/Synch.java
index 71a04ab..057cc9a 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Synch.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Synch.java
@@ -18,6 +18,8 @@
*/
package org.apache.ode.jacob.oo;
+import org.apache.ode.jacob.MessageType;
+
/**
@@ -25,11 +27,11 @@ package org.apache.ode.jacob.oo;
* <p>
* It is the only allowable return type (other than "void") for JACOB objects.
*
- * @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
public interface Synch extends Channel {
+ public interface RetMessage extends MessageType {}
- public void ret();
+ @MessageHandler(RetMessage.class) public void ret();
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/oo/Val.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/Val.java b/src/main/java/org/apache/ode/jacob/oo/Val.java
index 13fb0ec..caff565 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Val.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Val.java
@@ -18,13 +18,16 @@
*/
package org.apache.ode.jacob.oo;
+import org.apache.ode.jacob.MessageType;
+
/**
* Generic return-value channel type.
*/
public interface Val extends Channel {
+ public interface ValMessage extends MessageType {}
- public void val(Object retVal);
+ @MessageHandler(ValMessage.class) public void val(Object retVal);
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index f9eddf8..7857f47 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -26,6 +26,8 @@ import java.util.Stack;
import org.apache.ode.jacob.JacobObject;
import org.apache.ode.jacob.JacobThread;
+import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.MessageType;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelListener;
import org.apache.ode.jacob.oo.ClassUtil;
@@ -425,11 +427,19 @@ public final class JacobVPU {
stackThread();
long ctime = System.currentTimeMillis();
try {
- _method.invoke(_methodBody instanceof ReceiveProcess ?
- ((ReceiveProcess)_methodBody).getReceiver() : _methodBody, args);
+ if (_methodBody instanceof ReceiveProcess) {
+ Message msg = new Message(ClassUtil.getMessageType(_method));
+ msg.setBody(args);
+
+ ((ReceiveProcess)_methodBody).onMessage(msg);
+ // _method.invoke(((ReceiveProcess)_methodBody).getReceiver(), args);
+ } else {
+ ((Runnable)_methodBody).run();
+ }
if (synchChannel != null) {
synchChannel.ret();
}
+/*
} catch (IllegalAccessException iae) {
throw new RuntimeException("MethodNotAccessible: " + _method.getName() +
" in " + _method.getDeclaringClass().getName(), iae);
} catch (InvocationTargetException e) {
@@ -437,7 +447,8 @@ public final class JacobVPU {
throw (target instanceof RuntimeException)
? (RuntimeException) target
: new RuntimeException("ClientMethodException: " + _method.getName()
+ " in " + _methodBody.getClass().getName(), target);
- } finally {
+*/
+ } finally {
ctime = System.currentTimeMillis() - ctime;
_statistics.totalClientTimeMs += ctime;
unstackThread();
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
index 0bffb85..6fb53fc 100644
--- a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
+++ b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
@@ -19,24 +19,28 @@
package org.apache.ode.jacob.examples.cell;
+import org.apache.ode.jacob.MessageType;
import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
import org.apache.ode.jacob.oo.Val;
/**
* Channel type for a cell. The channel allows reading of and setting the values of a cell.
*/
public interface Cell extends Channel {
+ public interface ReadMessage extends MessageType {}
+ public interface WriteMessage extends MessageType {}
/**
* Read the value of the cell.
* @param replyTo channel to which the value of the cell is sent
*/
- public void read(Val replyTo);
+ @MessageHandler(ReadMessage.class) public void read(Val replyTo);
/**
* Write the value of the cell.
* @param newVal new value of the cell
*/
- public void write(Object newVal);
+ @MessageHandler(WriteMessage.class) public void write(Object newVal);
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
index 051a451..0ab7844 100644
--- a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
+++ b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
@@ -19,7 +19,9 @@
package org.apache.ode.jacob.examples.eratosthenes;
+import org.apache.ode.jacob.MessageType;
import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
import org.apache.ode.jacob.oo.Synch;
/**
@@ -30,7 +32,8 @@ import org.apache.ode.jacob.oo.Synch;
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
public interface NaturalNumberStream extends Channel {
+ public interface ValMessage extends MessageType {}
- public void val(int n, Synch ret);
+ @MessageHandler(ValMessage.class) public void val(int n, Synch ret);
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 92dc03c..37a0fec 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -19,8 +19,10 @@
package org.apache.ode.jacob.examples.helloworld;
import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.MessageType;
import org.apache.ode.jacob.examples.sequence.Sequence;
import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
import org.apache.ode.jacob.oo.ReceiveProcess;
import org.apache.ode.jacob.oo.Synch;
import org.apache.ode.jacob.oo.Val;
@@ -47,7 +49,10 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
public class HelloWorld extends JacobObject implements Runnable {
public static interface Callback<T, R extends Channel> extends Channel {
- public void invoke(T value, R callback);
+ public interface CallbackMessage extends MessageType {}
+
+ @MessageHandler(CallbackMessage.class) public void invoke(T value, R callback);
+
}
static class ReliablePrinterProcess extends JacobObject implements Runnable {
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
index 54eda1f..4da9319 100644
--- a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
+++ b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
@@ -18,7 +18,9 @@
*/
package org.apache.ode.jacob.examples.synch;
+import org.apache.ode.jacob.MessageType;
import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.oo.MessageHandler;
import org.apache.ode.jacob.oo.Synch;
/**
@@ -28,7 +30,8 @@ import org.apache.ode.jacob.oo.Synch;
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
public interface SynchPrint extends Channel {
+ public interface SynchPrintMessage extends MessageType {}
- public Synch print(String msg);
+ @MessageHandler(SynchPrintMessage.class) public Synch print(String msg);
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/a8339de5/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
index d1284dc..cb75def 100644
--- a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
+++ b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
@@ -24,8 +24,10 @@ import java.lang.reflect.Proxy;
import junit.framework.TestCase;
+import org.apache.ode.jacob.MessageType;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelProxy;
+import org.apache.ode.jacob.oo.MessageHandler;
public class ProxyConstructorTimingTest extends TestCase {
@@ -102,7 +104,8 @@ public class ProxyConstructorTimingTest extends TestCase {
}
public interface Greeter extends Channel {
- String hello(String name);
+ public interface HelloMessage extends MessageType {}
+ @MessageHandler(HelloMessage.class) String hello(String name);
}
@SuppressWarnings("serial")
|