Updated Branches:
refs/heads/master 69d711b99 -> b9dcd8963
Conversion from CommChannel to Channel added.
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/941f6c43
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/941f6c43
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/941f6c43
Branch: refs/heads/master
Commit: 941f6c43e02e5b2cc29c53cac7602c046c6742a4
Parents: 69d711b
Author: Tammo van Lessen <vanto@apache.org>
Authored: Fri Aug 16 00:19:57 2013 +0200
Committer: Tammo van Lessen <vanto@apache.org>
Committed: Fri Aug 16 00:19:57 2013 +0200
----------------------------------------------------------------------
.../java/org/apache/ode/jacob/ChannelRef.java | 22 ++++--
src/main/java/org/apache/ode/jacob/Jacob.java | 4 +-
.../java/org/apache/ode/jacob/JacobThread.java | 2 +-
.../org/apache/ode/jacob/soup/CommChannel.java | 13 ++--
.../apache/ode/jacob/vpu/ChannelFactory.java | 1 +
.../ode/jacob/vpu/ExecutionQueueImpl.java | 2 +
.../jacob/examples/helloworld/HelloWorld.java | 6 +-
.../org/apache/ode/jacob/vpu/ChannelTest.java | 71 ++++++++++++++++++++
8 files changed, 107 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/ChannelRef.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java
index 1209bcb..ea63610 100644
--- a/src/main/java/org/apache/ode/jacob/ChannelRef.java
+++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java
@@ -20,7 +20,9 @@ package org.apache.ode.jacob;
import java.io.Serializable;
+import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.vpu.ChannelFactory;
import org.apache.ode.jacob.vpu.JacobVPU;
@@ -30,19 +32,21 @@ import org.apache.ode.jacob.vpu.JacobVPU;
*/
public class ChannelRef implements Serializable {
- public enum Type { JACOB_OBJECT, CHANNEL, MESSAGE_LISTENER }
+ public enum Type { RUNNABLE, CHANNEL, MESSAGE_LISTENER }
private static final long serialVersionUID = 1L;
private final Type type;
private final Object target;
+ private transient Channel cachedChannel;
+
public ChannelRef(Object target) {
assert target != null;
if (target instanceof CommChannel) {
type = Type.CHANNEL;
- } else if (target instanceof JacobObject) {
- type = Type.JACOB_OBJECT;
+ } else if (target instanceof Runnable) {
+ type = Type.RUNNABLE;
} else if (target instanceof MessageListener) {
type = Type.MESSAGE_LISTENER;
} else {
@@ -66,10 +70,20 @@ public class ChannelRef implements Serializable {
@SuppressWarnings("unchecked")
public <T> T getEndpoint(Class<T> clazz) {
- if (type.equals(Type.JACOB_OBJECT) && JacobObject.class.isAssignableFrom(clazz))
{
+ if (type.equals(Type.RUNNABLE) && Runnable.class.isAssignableFrom(clazz))
{
return (T)target;
} else if (type.equals(Type.CHANNEL) && CommChannel.class.isAssignableFrom(clazz))
{
return (T)target;
+ } else if (type.equals(Type.CHANNEL) && Channel.class.isAssignableFrom(clazz))
{
+ if (cachedChannel == null) {
+ cachedChannel = ChannelFactory.createChannel((CommChannel)target, clazz);
+ }
+
+ if (!clazz.isAssignableFrom(cachedChannel.getClass())) {
+ throw new IllegalStateException("ChannelRef is already associated with a
channel of a different type");
+ }
+
+ return (T)cachedChannel;
} else if (type.equals(Type.MESSAGE_LISTENER) && MessageListener.class.isAssignableFrom(clazz))
{
return (T)target;
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/Jacob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java
index eda96fe..b636994 100644
--- a/src/main/java/org/apache/ode/jacob/Jacob.java
+++ b/src/main/java/org/apache/ode/jacob/Jacob.java
@@ -71,8 +71,8 @@ public class Jacob {
* @param channel
* @return
*/
- public static ChannelRef newCommChannel(Class<?> channelType, String description)
{
- return JacobVPU.activeJacobThread().newCommChannel(channelType, null, description);
+ public static ChannelRef newCommChannel(String description) {
+ return JacobVPU.activeJacobThread().newCommChannel(description);
}
/**
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/JacobThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java b/src/main/java/org/apache/ode/jacob/JacobThread.java
index 95a86a7..1d95c13 100644
--- a/src/main/java/org/apache/ode/jacob/JacobThread.java
+++ b/src/main/java/org/apache/ode/jacob/JacobThread.java
@@ -47,7 +47,7 @@ public interface JacobThread {
* @param description
* @return
*/
- public ChannelRef newCommChannel(Class<?> channelType, String creator, String description);
+ public ChannelRef newCommChannel(String description);
/**
* DOCUMENT ME
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/soup/CommChannel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/CommChannel.java b/src/main/java/org/apache/ode/jacob/soup/CommChannel.java
index bbf1409..5c33417 100644
--- a/src/main/java/org/apache/ode/jacob/soup/CommChannel.java
+++ b/src/main/java/org/apache/ode/jacob/soup/CommChannel.java
@@ -32,20 +32,25 @@ public class CommChannel extends ExecutionQueueObject implements Serializable
{
private Class<?> _type;
- // used for deserialization
- @SuppressWarnings("unused")
- private CommChannel() {}
+ public CommChannel() {}
public CommChannel(Class<?> type) {
_type = type;
}
+ public void setType(Class<?> type) {
+ if (_type != null && _type != type) {
+ throw new IllegalStateException("Type is already set!");
+ }
+ _type = type;
+ }
+
public Class<?> getType() {
return _type;
}
public String toString() {
- StringBuffer buf = new StringBuffer(_type.getSimpleName());
+ StringBuffer buf = new StringBuffer(_type == null ? "<unbound>" : _type.getSimpleName());
if (getDescription() != null) {
buf.append(':').append(getDescription());
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java b/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java
index 7fa9e00..ab5743d 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java
@@ -46,6 +46,7 @@ public class ChannelFactory {
InvocationHandler h = new ChannelInvocationHandler(backend);
Class<?>[] ifaces = new Class[] { ChannelProxy.class, type };
Object proxy = Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(), ifaces,
h);
+ backend.setType(type);
return (Channel) proxy;
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index d788afe..da83d31 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -98,6 +98,8 @@ public class ExecutionQueueImpl implements ExecutionQueue {
private Map<Object, LinkedList<IndexedObject>> _index = new LinkedHashMap<Object,
LinkedList<IndexedObject>>();
+ public ExecutionQueueImpl() {}
+
public ExecutionQueueImpl(ClassLoader classLoader) {
_classLoader = classLoader;
}
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 40f4440..1fa9408 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -231,10 +231,10 @@ public class HelloWorld extends JacobObject implements Runnable {
protected void calculusHelloWorld() {
// new(out)
- final ChannelRef out = newCommChannel(Val.class, "calculusHelloWorld-out");
+ final ChannelRef out = newCommChannel("calculusHelloWorld-out");
// new(x)
- final ChannelRef x = newCommChannel(Val.class, "calculusHelloWorld-x");
-
+ final ChannelRef x = newCommChannel("calculusHelloWorld-x");
+
// *(?out(str).!sysout(str))
subscribe(true, out, new PrinterMessageListener());
// *(?x(str).!out(str))
http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/941f6c43/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java b/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java
new file mode 100644
index 0000000..3361dfe
--- /dev/null
+++ b/src/test/java/org/apache/ode/jacob/vpu/ChannelTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.vpu;
+
+import static org.apache.ode.jacob.Jacob.newCommChannel;
+import static org.junit.Assert.*;
+import org.apache.ode.jacob.ChannelRef;
+import org.apache.ode.jacob.oo.Synch;
+import org.apache.ode.jacob.oo.Val;
+import org.apache.ode.jacob.soup.CommChannel;
+
+import org.junit.Test;
+
+public class ChannelTest {
+
+ @Test
+ public void testConnectWithInterface() {
+ JacobVPU vpu = new JacobVPU();
+ vpu.setContext(new ExecutionQueueImpl());
+
+ vpu.inject(new Runnable() {
+
+ @Override
+ public void run() {
+ ChannelRef cref = newCommChannel("unbound channel");
+ CommChannel cchannel = cref.getEndpoint(CommChannel.class);
+ assertNotNull(cchannel);
+ assertNull(cchannel.getType());
+
+ // now connect it to Val.class
+ Val val = cref.getEndpoint(Val.class);
+ assertNotNull(val);
+ assertEquals(Val.class, cchannel.getType());
+
+ // now try to associate it with a different channel interface
+ try {
+ cref.getEndpoint(Synch.class);
+ fail("we should get an IllegalStateException");
+ } catch (IllegalStateException e) {
+ assertEquals("ChannelRef is already associated with a channel of a different
type", e.getMessage());
+ }
+
+ // now try to associate with the same channel
+ Val val2 = cref.getEndpoint(Val.class);
+ assertNotNull(val2);
+ assertSame(val, val2);
+
+ }
+ });
+
+ assertEquals(true, vpu.getContext().hasReactions());
+ vpu.execute();
+ assertEquals(false, vpu.getContext().hasReactions());
+ }
+}
|