Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C152BFD2F for ; Tue, 13 Aug 2013 14:00:14 +0000 (UTC) Received: (qmail 85148 invoked by uid 500); 13 Aug 2013 14:00:14 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 85015 invoked by uid 500); 13 Aug 2013 14:00:11 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 84694 invoked by uid 99); 13 Aug 2013 14:00:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Aug 2013 14:00:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8779A8BDA0C; Tue, 13 Aug 2013 14:00:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vanto@apache.org To: commits@ode.apache.org Date: Tue, 13 Aug 2013 14:00:06 -0000 Message-Id: <8107a0b93b6c4a03a13484883cb5d845@git.apache.org> In-Reply-To: <0d3401767b664236b0c4cb78296ae287@git.apache.org> References: <0d3401767b664236b0c4cb78296ae287@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] git commit: Refactor to a Message based Continuation Refactor to a Message based Continuation Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/54b19738 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/54b19738 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/54b19738 Branch: refs/heads/master Commit: 54b1973893baca72b38d10c760f22a4031b14e9c Parents: 77ea811 Author: Hadrian Zbarcea Authored: Fri Aug 2 16:59:35 2013 -0400 Committer: Hadrian Zbarcea Committed: Fri Aug 2 16:59:35 2013 -0400 ---------------------------------------------------------------------- .../java/org/apache/ode/jacob/oo/Channel.java | 14 +++- .../apache/ode/jacob/oo/ChannelChannelRef.java | 39 +++++++++ .../java/org/apache/ode/jacob/oo/ClassUtil.java | 86 +++++++++++++++++++- .../ode/jacob/oo/JacobObjectChannelRef.java | 40 +++++++++ .../org/apache/ode/jacob/soup/Continuation.java | 65 ++++++++------- .../soup/jackson/ContinuationSerializer.java | 3 +- .../ode/jacob/vpu/ExecutionQueueImpl.java | 3 +- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 4 +- .../apache/ode/jacob/oo/JacobOOMappingTest.java | 69 ++++++++++++++++ 9 files changed, 283 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/Channel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/Channel.java b/src/main/java/org/apache/ode/jacob/oo/Channel.java index ac351e0..a59c9ec 100644 --- a/src/main/java/org/apache/ode/jacob/oo/Channel.java +++ b/src/main/java/org/apache/ode/jacob/oo/Channel.java @@ -21,8 +21,18 @@ package org.apache.ode.jacob.oo; import java.io.Serializable; /** - * Marker interface extended by channel interfaces. - * TODO: evaluate if an annotation, although slower, would offer a more elegant approach + * Marker interface extended by channel interfaces. + * TODO: Document how the Channel interface could be extended. + * - a concrete Channel should be defined as an interface + * - one should not overload Method names + * - a concrete Channel could be further extended, but + * then it must not redeclare a parent Channel method + * etc... + * + * Correctness of Channel definition should be established via + * static analysis and will be assumed at runtime, avoiding + * unnecessary checks for situation like deep inheritance + * hierarchies. */ public interface Channel extends Serializable { } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java b/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java new file mode 100644 index 0000000..72b2bb8 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.oo; + +import org.apache.ode.jacob.ChannelRef; + + +/** + * TODO: Document... + */ + +public class ChannelChannelRef implements ChannelRef { + private final Channel ref; + + public ChannelChannelRef(Channel channel) { + ref = channel; + } + + public Channel getRef() { + return ref; + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java index 15c8c4b..b16e43b 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java @@ -22,18 +22,25 @@ import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; +import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.Expression; +import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class ClassUtil { public static final Method RUN_METHOD; + public static final String RUN_METHOD_NAME = "run"; + public static final String RUN_METHOD_ACTION = "java.lang.Runnable#run"; private static final Set RUN_METHOD_SET = new HashSet(); + private static final Logger LOG = LoggerFactory.getLogger(ClassUtil.class); static { try { // Resolve the {@link Runnable#run} method once statically - RUN_METHOD = Runnable.class.getMethod("run", new Class[]{}); + RUN_METHOD = Runnable.class.getMethod(RUN_METHOD_NAME, new Class[]{}); RUN_METHOD_SET.add(RUN_METHOD); } catch (Exception e) { throw new Error("Cannot resolve 'run()' method", e); @@ -48,9 +55,60 @@ public final class ClassUtil { return RUN_METHOD_SET; } - public static String getMessageType(Method channelMethod) { + public static Message createMessage(JacobObject target, Method method, Object[] args) { + Message message = new Message(); + message.setTo(new JacobObjectChannelRef(target)); + message.setAction(ClassUtil.getActionForMethod(method)); + message.setBody(args); + return message; + } + public static String getActionForMethod(Method channelMethod) { + if (channelMethod == null) { + return null; + } MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class); - return handler == null ? channelMethod.getClass().getName() + "." + channelMethod.getName() : handler.value(); + if (handler != null) { + return handler.value(); + } + Class clazz = channelMethod.getDeclaringClass(); + if (Runnable.class.isAssignableFrom(clazz) + && channelMethod.getName() == "run" + && channelMethod.getParameterTypes().length == 0) { + return RUN_METHOD_ACTION; + } + if (!Channel.class.isAssignableFrom(clazz)) { + LOG.trace("Action '{}' can only be defined for a Channel extension", channelMethod.getName()); + return null; + } + LOG.trace("Probing Channel class '{}' for declaration of method '{}'", clazz.getName(), channelMethod.getName()); + for (Class c : clazz.getInterfaces()) { + String action = getChannelMethodAction(c, channelMethod); + if (action != null) { + return action; + } + } + // ... if clazz is a Channel interface itself + return getChannelMethodAction(clazz, channelMethod); + } + + /** + * @param clazz + * @param method + * @return + * + * The default action associated with a Channel method is defined as Class#method + * following a convention similar to javadoc. + */ + private static String getChannelMethodAction(Class clazz, Method method) { + if (Channel.class.isAssignableFrom(clazz)) { + try { + Method m = clazz.getMethod(method.getName(), method.getParameterTypes()); + return m.getDeclaringClass().getName() + "#" + m.getName(); + } catch (SecurityException e) { // ignore + } catch (NoSuchMethodException e) { // ignore + } + } + return null; } public static Expression findActionMethod(final Set implementedMethods) { @@ -60,7 +118,7 @@ public final class ClassUtil { String action = message.getAction(); if (Method.class.equals(type) && action != null) { for (Method m : implementedMethods) { - if (action.equals(ClassUtil.getMessageType(m))) { + if (action.equals(ClassUtil.getActionForMethod(m))) { return (T)m; } } @@ -69,6 +127,26 @@ public final class ClassUtil { } }; } + + public static Expression target() { + return new Expression() { + @SuppressWarnings("unchecked") + public T evaluate(Message message, Class type) { + ChannelRef ref = message.getTo(); + if (JacobObject.class.equals(type) && ref instanceof JacobObjectChannelRef) { + return (T)((JacobObjectChannelRef)ref).getRef(); + } else if (Channel.class.equals(type) && ref instanceof ChannelChannelRef) { + return (T)((ChannelChannelRef)ref).getRef(); + } + return null; + } + }; + } + + public static JacobObject getMessageClosure(Message message) { + return target().evaluate(message, JacobObject.class); + } + public static Set getImplementedMethods(Set methods, Class clazz) { // TODO: this can be optimized (some 20 times faster in my tests) by keeping a private // map of interfaces to methods: Map, Method[]> and just do lookups http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java b/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java new file mode 100644 index 0000000..a7b7ee7 --- /dev/null +++ b/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.oo; + +import org.apache.ode.jacob.ChannelRef; +import org.apache.ode.jacob.JacobObject; + + +/** + * TODO: Document... + */ + +public class JacobObjectChannelRef implements ChannelRef { + private final JacobObject ref; + + public JacobObjectChannelRef(JacobObject channel) { + ref = channel; + } + + public JacobObject getRef() { + return ref; + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/soup/Continuation.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/Continuation.java b/src/main/java/org/apache/ode/jacob/soup/Continuation.java index 0f4286f..92fa7a6 100644 --- a/src/main/java/org/apache/ode/jacob/soup/Continuation.java +++ b/src/main/java/org/apache/ode/jacob/soup/Continuation.java @@ -19,6 +19,8 @@ package org.apache.ode.jacob.soup; import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.Message; +import org.apache.ode.jacob.oo.ClassUtil; import java.lang.reflect.Method; @@ -29,35 +31,38 @@ import java.lang.reflect.Method; * @author Maciej Szefler mbs */ public class Continuation extends ExecutionQueueObject { - private JacobObject _closure; - private Method _method; - private Object[] _args; - - public Continuation(JacobObject target, Method method, Object[] args) { - _closure = target; - _method = method; - _args = args; - } - - public JacobObject getClosure() { - return _closure; - } - - public Method getMethod() { - return _method; - } - - public Object[] getArgs() { - return _args; - } - - public String toString () { - return new StringBuilder("{") - .append(this.getClass().getSimpleName()) - .append(" closure=").append(_closure) - .append(", method=").append(_method.getName()) - .append(", args=").append(_args) - .append("}").toString(); - } + private final Message message; + private JacobObject _closure; + private Method _method; + private Object[] _args; + + public Continuation(JacobObject target, Method method, Object[] args) { + _closure = target; + _method = method; + _args = args; + + message = ClassUtil.createMessage(target, method, args); + } + + public Method getMethod() { + return _method; + } + + public Object[] getArgs() { + return _args; + } + + public Message getMessage() { + return message; + } + + public String toString () { + return new StringBuilder("{") + .append(this.getClass().getSimpleName()) + .append(" closure=").append(_closure) + .append(", method=").append(_method.getName()) + .append(", args=").append(_args) + .append("}").toString(); + } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java index 80aaa9b..9a8c6ae 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java @@ -20,6 +20,7 @@ package org.apache.ode.jacob.soup.jackson; import java.io.IOException; +import org.apache.ode.jacob.oo.ClassUtil; import org.apache.ode.jacob.soup.Continuation; import com.fasterxml.jackson.core.JsonGenerationException; @@ -63,7 +64,7 @@ public class ContinuationSerializer extends StdSerializer { private void serializeContents(Continuation value, JsonGenerator jgen, SerializerProvider provider) throws JsonGenerationException, IOException { - jgen.writeObjectField("target", value.getClosure()); + jgen.writeObjectField("target", ClassUtil.getMessageClosure(value.getMessage())); jgen.writeStringField("method", value.getMethod().getName()); jgen.writeObjectField("args", value.getArgs()); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index c9b24f5..9fc1085 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -45,6 +45,7 @@ import org.apache.ode.jacob.IndexedObject; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; +import org.apache.ode.jacob.oo.ClassUtil; import org.apache.ode.jacob.soup.Comm; import org.apache.ode.jacob.soup.CommChannel; import org.apache.ode.jacob.soup.CommGroup; @@ -293,7 +294,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { // Write out the reactions. sos.writeInt(_reactions.size()); for (Continuation c : _reactions) { - sos.writeObject(c.getClosure()); + sos.writeObject(ClassUtil.getMessageClosure(c.getMessage())); sos.writeUTF(c.getMethod().getName()); sos.writeInt(c.getArgs() == null ? 0 : c.getArgs().length); for (int j = 0; c.getArgs() != null && j < c.getArgs().length; ++j) http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 491fb4b..bad5aa3 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -248,7 +248,7 @@ public final class JacobVPU { JacobThreadImpl(Continuation rqe) { assert rqe != null; - _methodBody = rqe.getClosure(); + _methodBody = ClassUtil.getMessageClosure(rqe.getMessage()); _args = rqe.getArgs(); _source = rqe.getDescription(); _method = rqe.getMethod(); @@ -427,7 +427,7 @@ public final class JacobVPU { long ctime = System.currentTimeMillis(); try { if (_methodBody instanceof ReceiveProcess) { - Message msg = new Message(null, null, ClassUtil.getMessageType(_method)); + Message msg = new Message(null, null, ClassUtil.getActionForMethod(_method)); msg.setBody(args); ((ReceiveProcess)_methodBody).onMessage(msg); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/54b19738/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java b/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java new file mode 100644 index 0000000..daf056e --- /dev/null +++ b/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.oo; + + +import java.lang.reflect.Method; + +import org.junit.Assert; +import org.junit.Test; + + +public class JacobOOMappingTest { + + @Test + public void testProcessAction() { + String action = ClassUtil.getActionForMethod(methodOf(TestProcess.class, "run")); + Assert.assertEquals("java.lang.Runnable#run", action); + } + + + @Test + public void testDefaultChannelAction() { + String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "one")); + Assert.assertEquals("org.apache.ode.jacob.oo.JacobOOMappingTest$TestChannel#one", action); + } + + @Test + public void testCustomChannelAction() { + String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "two")); + Assert.assertEquals("TestChannel-custom", action); + } + + private final Method methodOf(Class clazz, String name) { + try { + return clazz.getMethod(name, new Class[]{}); + } catch (Exception e) { + // ignore + } + return null; + } + + public static interface TestChannel extends Channel { + void one(); + @MessageHandler("TestChannel-custom") void two(); + } + + public static class TestProcess implements Runnable { + public void run() { + // do nothing + } + } + +}