Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 28EB310EDA for ; Thu, 15 Aug 2013 21:18:27 +0000 (UTC) Received: (qmail 63157 invoked by uid 500); 15 Aug 2013 21:18:18 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 63083 invoked by uid 500); 15 Aug 2013 21:18:08 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 63062 invoked by uid 99); 15 Aug 2013 21:18:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Aug 2013 21:18:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2B06B83767C; Thu, 15 Aug 2013 21:18:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vanto@apache.org To: commits@ode.apache.org Date: Thu, 15 Aug 2013 21:18:07 -0000 Message-Id: <4de44d27969447e483c73d623ed5c656@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: fixing the garbage collection for the calculus case. Updated Branches: refs/heads/master 4556e9bc9 -> 69d711b99 fixing the garbage collection for the calculus case. Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/e820dc14 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/e820dc14 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/e820dc14 Branch: refs/heads/master Commit: e820dc141c86fa0da5314bee6d821b5ed8996ee7 Parents: 4556e9b Author: Tammo van Lessen Authored: Thu Aug 15 21:40:31 2013 +0200 Committer: Tammo van Lessen Committed: Thu Aug 15 21:40:31 2013 +0200 ---------------------------------------------------------------------- src/main/java/org/apache/ode/jacob/Jacob.java | 6 +++--- .../java/org/apache/ode/jacob/JacobThread.java | 6 +++--- .../ode/jacob/soup/ExecutionQueueObject.java | 6 +++--- .../soup/jackson/ChannelProxySerializer.java | 14 +++++--------- .../jacob/soup/jackson/ChannelRefSerializer.java | 12 ++++++++++-- .../soup/jackson/JacksonExecutionQueueImpl.java | 19 +++++++++++-------- .../ode/jacob/soup/jackson/JacobModule.java | 9 +++++---- .../apache/ode/jacob/vpu/ExecutionQueueImpl.java | 2 +- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 15 +++++++++------ .../jacob/examples/helloworld/HelloWorld.java | 16 ++++++++-------- 10 files changed, 58 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/Jacob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java index 680c383..f963efb 100644 --- a/src/main/java/org/apache/ode/jacob/Jacob.java +++ b/src/main/java/org/apache/ode/jacob/Jacob.java @@ -71,7 +71,7 @@ public class Jacob { * @param channel * @return */ - public static CommChannel newCommChannel(Class channelType, String description) { + public static ChannelRef newCommChannel(Class channelType, String description) { return JacobVPU.activeJacobThread().newCommChannel(channelType, null, description); } @@ -103,11 +103,11 @@ public class Jacob { JacobVPU.activeJacobThread().sendMessage(message); } - public static void subscribe(boolean replicate, CommChannel channel, MessageListener messageListener) throws IllegalArgumentException { + public static void subscribe(boolean replicate, ChannelRef channel, MessageListener messageListener) throws IllegalArgumentException { JacobVPU.activeJacobThread().subscribe(replicate, channel, messageListener); } - public static void subscribe(boolean replicate, CommChannel channel, MessageListener[] messageListeners) throws IllegalArgumentException { + public static void subscribe(boolean replicate, ChannelRef channel, MessageListener[] messageListeners) throws IllegalArgumentException { JacobVPU.activeJacobThread().subscribe(replicate, channel, messageListeners); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/JacobThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java b/src/main/java/org/apache/ode/jacob/JacobThread.java index 0b71422..d89168a 100644 --- a/src/main/java/org/apache/ode/jacob/JacobThread.java +++ b/src/main/java/org/apache/ode/jacob/JacobThread.java @@ -47,7 +47,7 @@ public interface JacobThread { * @param description * @return */ - public CommChannel newCommChannel(Class channelType, String creator, String description); + public ChannelRef newCommChannel(Class channelType, String creator, String description); /** * DOCUMENT ME @@ -71,8 +71,8 @@ public interface JacobThread { */ public void sendMessage(Message message); - public void subscribe(boolean replicate, CommChannel channel, MessageListener methodList) throws IllegalArgumentException; - public void subscribe(boolean replicate, CommChannel channel, MessageListener[] methodList) throws IllegalArgumentException; + public void subscribe(boolean replicate, ChannelRef channel, MessageListener methodList) throws IllegalArgumentException; + public void subscribe(boolean replicate, ChannelRef channel, MessageListener[] methodList) throws IllegalArgumentException; // OO oriented API http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java index 82679fc..f6a9315 100644 --- a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java +++ b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java @@ -28,7 +28,7 @@ public class ExecutionQueueObject { /** * A unique idefntifer for this object in the queue (should only be set by queue). */ - private Object _id; + private Integer _id; /** * A human-readable description of the object. @@ -46,14 +46,14 @@ public class ExecutionQueueObject { _description = description; } - public void setId(Object id) { + public void setId(Integer id) { if (_id != null) { throw new IllegalStateException("Object id already set for " + this); } _id = id; } - public Object getId() { + public Integer getId() { return _id; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java index c28ad23..95948db 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java @@ -19,12 +19,11 @@ package org.apache.ode.jacob.soup.jackson; import java.io.IOException; -import java.util.LinkedHashSet; -import java.util.Set; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelProxy; import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer; import org.apache.ode.jacob.vpu.ChannelFactory; import com.fasterxml.jackson.core.JsonGenerationException; @@ -45,10 +44,11 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer; */ public class ChannelProxySerializer extends StdSerializer{ - private final Set serializedChannels = new LinkedHashSet(); + private final ExecutionQueueImplSerializer executionQueueImplSerializer; - protected ChannelProxySerializer() { + protected ChannelProxySerializer(ExecutionQueueImplSerializer executionQueueImplSerializer) { super(ChannelProxy.class); + this.executionQueueImplSerializer = executionQueueImplSerializer; } @Override @@ -79,11 +79,7 @@ public class ChannelProxySerializer extends StdSerializer{ jgen.writeNumberField("channelId", cid); // save channel id for garbage collection - serializedChannels.add(cid); - } - - public Set getSerializedChannels() { - return serializedChannels; + executionQueueImplSerializer.markChannelUsed(cid); } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java index 4e226f1..4144279 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java @@ -22,9 +22,10 @@ import java.io.IOException; import java.lang.reflect.Field; import org.apache.ode.jacob.ChannelRef; -import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.ChannelRef.Type; import org.apache.ode.jacob.Message; import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.core.JsonGenerator; @@ -41,8 +42,11 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer; */ public class ChannelRefSerializer extends StdSerializer { - public ChannelRefSerializer() { + private final ExecutionQueueImplSerializer executionQueueImplSerializer; + + public ChannelRefSerializer(ExecutionQueueImplSerializer executionQueueImplSerializer) { super(ChannelRef.class); + this.executionQueueImplSerializer = executionQueueImplSerializer; } @Override @@ -72,6 +76,10 @@ public class ChannelRefSerializer extends StdSerializer { targetField.setAccessible(true); jgen.writeObjectField("target", targetField.get(value)); + if (value.getType() == Type.CHANNEL) { + executionQueueImplSerializer.markChannelUsed(value.getEndpoint(CommChannel.class).getId()); + } + } catch (Exception ex) { throw new RuntimeException(ex); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java index 73f700b..1a1fe8d 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.Set; import org.apache.ode.jacob.Message; @@ -59,11 +60,14 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { public static class ExecutionQueueImplSerializer extends StdSerializer { - private ChannelProxySerializer channelProxySerializer; - - public ExecutionQueueImplSerializer(ChannelProxySerializer cps) { + private Set usedChannels = new LinkedHashSet(); + + public ExecutionQueueImplSerializer() { super(JacksonExecutionQueueImpl.class); - this.channelProxySerializer = cps; + } + + public void markChannelUsed(int channelId) { + usedChannels.add(channelId); } @Override @@ -88,7 +92,7 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { private void serializeContents(JacksonExecutionQueueImpl value, JsonGenerator jgen, SerializerProvider provider) throws JsonGenerationException, IOException { - channelProxySerializer.getSerializedChannels().clear(); + usedChannels.clear(); // write metadata jgen.writeNumberField("objIdCounter", value._objIdCounter); @@ -111,14 +115,13 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl { nullgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[] {})); // remove unreferenced channels (and keep those which have been exported using export()). - Set referencedChannels = channelProxySerializer.getSerializedChannels(); for (Iterator i = value._channels.values().iterator(); i.hasNext();) { ChannelFrame cframe = i.next(); - if (referencedChannels.contains(cframe.getId()) || cframe.getRefCount() > 0) { + if (usedChannels.contains(cframe.getId()) || cframe.getRefCount() > 0) { // skip } else { LOG.debug("GC Channel: {}", cframe); - //i.remove(); + i.remove(); } } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java index 0175046..ddab9aa 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java @@ -24,6 +24,7 @@ import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelProxy; import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplDeserializer; import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer; +import org.apache.ode.jacob.soup.jackson.ChannelProxySerializer; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.BeanDescription; @@ -46,11 +47,11 @@ public class JacobModule extends SimpleModule { public JacobModule() { super("jacob-module", Version.unknownVersion()); - final ChannelProxySerializer cps = new ChannelProxySerializer(); - addSerializer(ChannelProxy.class, cps); + final ExecutionQueueImplSerializer cqis = new ExecutionQueueImplSerializer(); + addSerializer(ChannelProxy.class, new ChannelProxySerializer(cqis)); addSerializer(Message.class, new MessageSerializer()); - addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer(cps)); - addSerializer(ChannelRef.class, new ChannelRefSerializer()); + addSerializer(JacksonExecutionQueueImpl.class, cqis); + addSerializer(ChannelRef.class, new ChannelRefSerializer(cqis)); addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer()); addDeserializer(Message.class, new MessageDeserializer()); addDeserializer(Channel.class, new ChannelProxyDeserializer()); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index c5a9e10..8e4265b 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -399,7 +399,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { throw new IllegalArgumentException("The object " + so + " is not new!"); } - private void assignId(ExecutionQueueObject so, Object id) { + private void assignId(ExecutionQueueObject so, Integer id) { so.setId(id); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index b70e5ab..d0114e6 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Stack; +import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.JacobObject; import org.apache.ode.jacob.JacobThread; import org.apache.ode.jacob.Message; @@ -294,7 +295,7 @@ public final class JacobVPU { return ret; } - public CommChannel newCommChannel(Class channelType, String creator, String description) { + public ChannelRef newCommChannel(Class channelType, String creator, String description) { CommChannel chnl = new CommChannel(channelType); chnl.setDescription(description); _executionQueue.add(chnl); @@ -302,7 +303,7 @@ public final class JacobVPU { LOG.trace(">> [{}] : new {}", _cycle, chnl); _statistics.channelsCreated++; - return chnl; + return new ChannelRef(chnl); } public String exportChannel(Channel channel) { @@ -375,7 +376,8 @@ public final class JacobVPU { _executionQueue.add(grp); } - public void subscribe(boolean replicate, CommChannel channel, MessageListener listener) { + public void subscribe(boolean replicate, ChannelRef channel, MessageListener listener) { + assert channel.getType() == ChannelRef.Type.CHANNEL; if (LOG.isTraceEnabled()) { StringBuffer msg = new StringBuffer(); msg.append(_cycle); @@ -389,13 +391,14 @@ public final class JacobVPU { _statistics.numContinuations++; CommGroup grp = new CommGroup(replicate); - CommRecv recv = new CommRecv(channel, listener); + CommRecv recv = new CommRecv(channel.getEndpoint(CommChannel.class), listener); grp.add(recv); _executionQueue.add(grp); } - public void subscribe(boolean replicate, CommChannel channel, MessageListener listeners[]) { + public void subscribe(boolean replicate, ChannelRef channel, MessageListener listeners[]) { + assert channel.getType() == ChannelRef.Type.CHANNEL; if (LOG.isTraceEnabled()) { StringBuffer msg = new StringBuffer(); msg.append(_cycle); @@ -413,7 +416,7 @@ public final class JacobVPU { CommGroup grp = new CommGroup(replicate); for (int i = 0; i < listeners.length; ++i) { - CommRecv recv = new CommRecv(channel, listeners[i]); + CommRecv recv = new CommRecv(channel.getEndpoint(CommChannel.class), listeners[i]); grp.add(recv); } _executionQueue.add(grp); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/e820dc14/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 51a5a18..15587ac 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -221,9 +221,9 @@ public class HelloWorld extends JacobObject implements Runnable { protected void calculusHelloWorld() { // new(out) - final CommChannel out = newCommChannel(Val.class, "calculusHelloWorld-out"); + final ChannelRef out = newCommChannel(Val.class, "calculusHelloWorld-out"); // new(x) - final CommChannel x = newCommChannel(Val.class, "calculusHelloWorld-x"); + final ChannelRef x = newCommChannel(Val.class, "calculusHelloWorld-x"); // *(?out(str).!sysout(str)) subscribe(true, out, new PrinterMessageListener()); @@ -244,16 +244,16 @@ public class HelloWorld extends JacobObject implements Runnable { } static class ForwarderMessageListener implements MessageListener { - private CommChannel to; + private ChannelRef to; @JsonCreator - public ForwarderMessageListener(@JsonProperty("to") CommChannel to) { + public ForwarderMessageListener(@JsonProperty("to") ChannelRef to) { this.to = to; } @Override public void onMessage(Message msg) { - Message msg2 = new Message(new ChannelRef(to), null, msg.getAction()); + Message msg2 = new Message(to, null, msg.getAction()); msg2.setBody(msg.getBody()); sendMessage(msg2); } @@ -261,16 +261,16 @@ public class HelloWorld extends JacobObject implements Runnable { static class StringEmitterRunnable extends JacobObject implements Runnable { private String str; - private CommChannel to; + private ChannelRef to; @JsonCreator - public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to") CommChannel to) { + public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to") ChannelRef to) { this.str = str; this.to = to; } public void run() { - Message msg = new Message(new ChannelRef(to), null, "printHW"); + Message msg = new Message(to, null, "printHW"); msg.setBody(str); sendMessage(msg); }