ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/3] git commit: fixing the garbage collection for the calculus case.
Date Thu, 15 Aug 2013 21:18:07 GMT
Updated Branches:
  refs/heads/master 4556e9bc9 -> 69d711b99


fixing the garbage collection for the calculus case.


Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/e820dc14
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/e820dc14
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/e820dc14

Branch: refs/heads/master
Commit: e820dc141c86fa0da5314bee6d821b5ed8996ee7
Parents: 4556e9b
Author: Tammo van Lessen <vanto@apache.org>
Authored: Thu Aug 15 21:40:31 2013 +0200
Committer: Tammo van Lessen <vanto@apache.org>
Committed: Thu Aug 15 21:40:31 2013 +0200

----------------------------------------------------------------------
 src/main/java/org/apache/ode/jacob/Jacob.java    |  6 +++---
 .../java/org/apache/ode/jacob/JacobThread.java   |  6 +++---
 .../ode/jacob/soup/ExecutionQueueObject.java     |  6 +++---
 .../soup/jackson/ChannelProxySerializer.java     | 14 +++++---------
 .../jacob/soup/jackson/ChannelRefSerializer.java | 12 ++++++++++--
 .../soup/jackson/JacksonExecutionQueueImpl.java  | 19 +++++++++++--------
 .../ode/jacob/soup/jackson/JacobModule.java      |  9 +++++----
 .../apache/ode/jacob/vpu/ExecutionQueueImpl.java |  2 +-
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java  | 15 +++++++++------
 .../jacob/examples/helloworld/HelloWorld.java    | 16 ++++++++--------
 10 files changed, 58 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/Jacob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java
index 680c383..f963efb 100644
--- a/src/main/java/org/apache/ode/jacob/Jacob.java
+++ b/src/main/java/org/apache/ode/jacob/Jacob.java
@@ -71,7 +71,7 @@ public class Jacob {
      * @param channel
      * @return
      */
-    public static CommChannel newCommChannel(Class<?> channelType, String description)
{
+    public static ChannelRef newCommChannel(Class<?> channelType, String description)
{
         return JacobVPU.activeJacobThread().newCommChannel(channelType, null, description);
     }
     
@@ -103,11 +103,11 @@ public class Jacob {
         JacobVPU.activeJacobThread().sendMessage(message);
     }
 
-    public static void subscribe(boolean replicate, CommChannel channel, MessageListener
messageListener) throws IllegalArgumentException {
+    public static void subscribe(boolean replicate, ChannelRef channel, MessageListener messageListener)
throws IllegalArgumentException {
         JacobVPU.activeJacobThread().subscribe(replicate, channel, messageListener);
     }
     
-    public static void subscribe(boolean replicate, CommChannel channel, MessageListener[]
messageListeners) throws IllegalArgumentException {
+    public static void subscribe(boolean replicate, ChannelRef channel, MessageListener[]
messageListeners) throws IllegalArgumentException {
         JacobVPU.activeJacobThread().subscribe(replicate, channel, messageListeners);
     }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/JacobThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java b/src/main/java/org/apache/ode/jacob/JacobThread.java
index 0b71422..d89168a 100644
--- a/src/main/java/org/apache/ode/jacob/JacobThread.java
+++ b/src/main/java/org/apache/ode/jacob/JacobThread.java
@@ -47,7 +47,7 @@ public interface JacobThread {
      * @param description
      * @return
      */
-    public CommChannel newCommChannel(Class<?> channelType, String creator, String
description);
+    public ChannelRef newCommChannel(Class<?> channelType, String creator, String description);
     
     /**
      * DOCUMENT ME
@@ -71,8 +71,8 @@ public interface JacobThread {
      */
     public void sendMessage(Message message);
 
-    public void subscribe(boolean replicate, CommChannel channel, MessageListener methodList)
throws IllegalArgumentException;
-    public void subscribe(boolean replicate, CommChannel channel, MessageListener[] methodList)
throws IllegalArgumentException;
+    public void subscribe(boolean replicate, ChannelRef channel, MessageListener methodList)
throws IllegalArgumentException;
+    public void subscribe(boolean replicate, ChannelRef channel, MessageListener[] methodList)
throws IllegalArgumentException;
 
     // OO oriented API
     

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
index 82679fc..f6a9315 100644
--- a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
+++ b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
@@ -28,7 +28,7 @@ public class ExecutionQueueObject {
     /**
      * A unique idefntifer for this object in the queue (should only be set by queue).
      */
-    private Object _id;
+    private Integer _id;
 
     /**
      * A human-readable description of the object.
@@ -46,14 +46,14 @@ public class ExecutionQueueObject {
         _description = description;
     }
 
-    public void setId(Object id) {
+    public void setId(Integer id) {
         if (_id != null) {
             throw new IllegalStateException("Object id already set for " + this);
         }
         _id = id;
     }
 
-    public Object getId() {
+    public Integer getId() {
         return _id;
     }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
index c28ad23..95948db 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
@@ -19,12 +19,11 @@
 package org.apache.ode.jacob.soup.jackson;
 
 import java.io.IOException;
-import java.util.LinkedHashSet;
-import java.util.Set;
 
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelProxy;
 import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
 import org.apache.ode.jacob.vpu.ChannelFactory;
 
 import com.fasterxml.jackson.core.JsonGenerationException;
@@ -45,10 +44,11 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
  */
 public class ChannelProxySerializer extends StdSerializer<ChannelProxy>{
 
-    private final Set<Integer> serializedChannels = new LinkedHashSet<Integer>();
+    private final ExecutionQueueImplSerializer executionQueueImplSerializer;
     
-    protected ChannelProxySerializer() {
+    protected ChannelProxySerializer(ExecutionQueueImplSerializer executionQueueImplSerializer)
{
         super(ChannelProxy.class);
+        this.executionQueueImplSerializer = executionQueueImplSerializer;
     }
 
     @Override
@@ -79,11 +79,7 @@ public class ChannelProxySerializer extends StdSerializer<ChannelProxy>{
         jgen.writeNumberField("channelId", cid);
 
         // save channel id for garbage collection
-        serializedChannels.add(cid);
-    }
-
-    public Set<Integer> getSerializedChannels() {
-        return serializedChannels;
+        executionQueueImplSerializer.markChannelUsed(cid);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
index 4e226f1..4144279 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
@@ -22,9 +22,10 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 
 import org.apache.ode.jacob.ChannelRef;
-import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.ChannelRef.Type;
 import org.apache.ode.jacob.Message;
 import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
 
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -41,8 +42,11 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
  */
 public class ChannelRefSerializer extends StdSerializer<ChannelRef> {
 
-    public ChannelRefSerializer() {
+    private final ExecutionQueueImplSerializer executionQueueImplSerializer;
+
+    public ChannelRefSerializer(ExecutionQueueImplSerializer executionQueueImplSerializer)
{
         super(ChannelRef.class);
+        this.executionQueueImplSerializer = executionQueueImplSerializer;
     }
     
     @Override
@@ -72,6 +76,10 @@ public class ChannelRefSerializer extends StdSerializer<ChannelRef>
{
             targetField.setAccessible(true);
             jgen.writeObjectField("target", targetField.get(value));
             
+            if (value.getType() == Type.CHANNEL) {
+                executionQueueImplSerializer.markChannelUsed(value.getEndpoint(CommChannel.class).getId());
+            }
+            
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
index 73f700b..1a1fe8d 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Set;
 
 import org.apache.ode.jacob.Message;
@@ -59,11 +60,14 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
 	
     public static class ExecutionQueueImplSerializer extends StdSerializer<JacksonExecutionQueueImpl>
{
 
-        private ChannelProxySerializer channelProxySerializer;
-
-        public ExecutionQueueImplSerializer(ChannelProxySerializer cps) {
+        private Set<Integer> usedChannels = new LinkedHashSet<Integer>();
+        
+        public ExecutionQueueImplSerializer() {
             super(JacksonExecutionQueueImpl.class);
-            this.channelProxySerializer = cps;
+        }
+        
+        public void markChannelUsed(int channelId) {
+            usedChannels.add(channelId);
         }
         
         @Override
@@ -88,7 +92,7 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
         private void serializeContents(JacksonExecutionQueueImpl value, JsonGenerator jgen,
                 SerializerProvider provider) throws JsonGenerationException, IOException
{
 
-            channelProxySerializer.getSerializedChannels().clear();
+            usedChannels.clear();
             
         	// write metadata
             jgen.writeNumberField("objIdCounter", value._objIdCounter);
@@ -111,14 +115,13 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
             nullgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[]
{}));
 
             // remove unreferenced channels (and keep those which have been exported using
export()).
-            Set<Integer> referencedChannels = channelProxySerializer.getSerializedChannels();
             for (Iterator<ChannelFrame> i = value._channels.values().iterator(); i.hasNext();)
{
                 ChannelFrame cframe = i.next();
-                if (referencedChannels.contains(cframe.getId()) || cframe.getRefCount() >
0) {
+                if (usedChannels.contains(cframe.getId()) || cframe.getRefCount() > 0)
{
                     // skip
                 } else {
                     LOG.debug("GC Channel: {}", cframe);
-                    //i.remove();
+                    i.remove();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
index 0175046..ddab9aa 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
@@ -24,6 +24,7 @@ import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelProxy;
 import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplDeserializer;
 import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
+import org.apache.ode.jacob.soup.jackson.ChannelProxySerializer;
 
 import com.fasterxml.jackson.core.Version;
 import com.fasterxml.jackson.databind.BeanDescription;
@@ -46,11 +47,11 @@ public class JacobModule extends SimpleModule {
     public JacobModule() {
         super("jacob-module", Version.unknownVersion());
         
-        final ChannelProxySerializer cps = new ChannelProxySerializer();
-        addSerializer(ChannelProxy.class, cps);
+        final ExecutionQueueImplSerializer cqis = new ExecutionQueueImplSerializer();
+        addSerializer(ChannelProxy.class, new ChannelProxySerializer(cqis));
         addSerializer(Message.class, new MessageSerializer());
-        addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer(cps));
-        addSerializer(ChannelRef.class, new ChannelRefSerializer());
+        addSerializer(JacksonExecutionQueueImpl.class, cqis);
+        addSerializer(ChannelRef.class, new ChannelRefSerializer(cqis));
         addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer());
         addDeserializer(Message.class, new MessageDeserializer());
         addDeserializer(Channel.class, new ChannelProxyDeserializer());

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index c5a9e10..8e4265b 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -399,7 +399,7 @@ public class ExecutionQueueImpl implements ExecutionQueue {
             throw new IllegalArgumentException("The object " + so + " is not new!");
     }
 
-    private void assignId(ExecutionQueueObject so, Object id) {
+    private void assignId(ExecutionQueueObject so, Integer id) {
         so.setId(id);
     }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index b70e5ab..d0114e6 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 
+import org.apache.ode.jacob.ChannelRef;
 import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.JacobThread;
 import org.apache.ode.jacob.Message;
@@ -294,7 +295,7 @@ public final class JacobVPU {
             return ret;
         }
 
-        public CommChannel newCommChannel(Class<?> channelType, String creator, String
description) {
+        public ChannelRef newCommChannel(Class<?> channelType, String creator, String
description) {
             CommChannel chnl = new CommChannel(channelType);
             chnl.setDescription(description);
             _executionQueue.add(chnl);
@@ -302,7 +303,7 @@ public final class JacobVPU {
             LOG.trace(">> [{}] : new {}", _cycle, chnl);
 
             _statistics.channelsCreated++;
-            return chnl;
+            return new ChannelRef(chnl);
         }
         
         public String exportChannel(Channel channel) {
@@ -375,7 +376,8 @@ public final class JacobVPU {
             _executionQueue.add(grp);
         }
         
-        public void subscribe(boolean replicate, CommChannel channel, MessageListener listener)
{
+        public void subscribe(boolean replicate, ChannelRef channel, MessageListener listener)
{
+            assert channel.getType() == ChannelRef.Type.CHANNEL;
             if (LOG.isTraceEnabled()) {
                 StringBuffer msg = new StringBuffer();
                 msg.append(_cycle);
@@ -389,13 +391,14 @@ public final class JacobVPU {
             _statistics.numContinuations++;
 
             CommGroup grp = new CommGroup(replicate);
-            CommRecv recv = new CommRecv(channel, listener);
+            CommRecv recv = new CommRecv(channel.getEndpoint(CommChannel.class), listener);
             grp.add(recv);
 
             _executionQueue.add(grp);
         }
 
-        public void subscribe(boolean replicate, CommChannel channel, MessageListener listeners[])
{
+        public void subscribe(boolean replicate, ChannelRef channel, MessageListener listeners[])
{
+            assert channel.getType() == ChannelRef.Type.CHANNEL;
             if (LOG.isTraceEnabled()) {
                 StringBuffer msg = new StringBuffer();
                 msg.append(_cycle);
@@ -413,7 +416,7 @@ public final class JacobVPU {
 
             CommGroup grp = new CommGroup(replicate);
             for (int i = 0; i < listeners.length; ++i) {
-                CommRecv recv = new CommRecv(channel, listeners[i]);
+                CommRecv recv = new CommRecv(channel.getEndpoint(CommChannel.class), listeners[i]);
                 grp.add(recv);
             }
             _executionQueue.add(grp);

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 51a5a18..15587ac 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -221,9 +221,9 @@ public class HelloWorld extends JacobObject implements Runnable {
 
     protected void calculusHelloWorld() {
         // new(out)
-        final CommChannel out = newCommChannel(Val.class, "calculusHelloWorld-out");
+        final ChannelRef out = newCommChannel(Val.class, "calculusHelloWorld-out");
         // new(x)
-        final CommChannel x = newCommChannel(Val.class, "calculusHelloWorld-x");
+        final ChannelRef x = newCommChannel(Val.class, "calculusHelloWorld-x");
 
         // *(?out(str).!sysout(str))
         subscribe(true, out, new PrinterMessageListener());
@@ -244,16 +244,16 @@ public class HelloWorld extends JacobObject implements Runnable {
     }
     
     static class ForwarderMessageListener implements MessageListener {
-        private CommChannel to;
+        private ChannelRef to;
         
         @JsonCreator
-        public ForwarderMessageListener(@JsonProperty("to") CommChannel to) {
+        public ForwarderMessageListener(@JsonProperty("to") ChannelRef to) {
             this.to = to;
         }
 
         @Override
         public void onMessage(Message msg) {
-            Message msg2 = new Message(new ChannelRef(to), null, msg.getAction());
+            Message msg2 = new Message(to, null, msg.getAction());
             msg2.setBody(msg.getBody());
             sendMessage(msg2);
         }
@@ -261,16 +261,16 @@ public class HelloWorld extends JacobObject implements Runnable {
     
     static class StringEmitterRunnable extends JacobObject implements Runnable {
         private String str;
-        private CommChannel to;
+        private ChannelRef to;
 
         @JsonCreator
-        public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to")
CommChannel to) {
+        public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to")
ChannelRef to) {
             this.str = str;
             this.to = to;
         }
 
         public void run() {
-            Message msg = new Message(new ChannelRef(to), null, "printHW");
+            Message msg = new Message(to, null, "printHW");
             msg.setBody(str);
             sendMessage(msg);
         }


Mime
View raw message