Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A14F018FBA for ; Tue, 8 Mar 2016 19:12:40 +0000 (UTC) Received: (qmail 54840 invoked by uid 500); 8 Mar 2016 19:12:40 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 54807 invoked by uid 500); 8 Mar 2016 19:12:40 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 54798 invoked by uid 99); 8 Mar 2016 19:12:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 19:12:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B64FC1A02B4 for ; Tue, 8 Mar 2016 19:12:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id GeKE5Gekxcwy for ; Tue, 8 Mar 2016 19:12:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id A3AA15F56B for ; Tue, 8 Mar 2016 19:12:36 +0000 (UTC) Received: (qmail 54707 invoked by uid 99); 8 Mar 2016 19:12:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 19:12:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8389BDFC6B; Tue, 8 Mar 2016 19:12:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.incubator.apache.org Date: Tue, 08 Mar 2016 19:12:36 -0000 Message-Id: In-Reply-To: <2f3cf0e4e1744ed8805d4e5663e6f7c6@git.apache.org> References: <2f3cf0e4e1744ed8805d4e5663e6f7c6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-apex-core git commit: APEXCORE-374 - Block with positive reference count is found during buffer server purge. When LogicalNode is teared down it's iterator must be closed to decrement reference count of the block it points to. APEXCORE-374 - Block with positive reference count is found during buffer server purge. When LogicalNode is teared down it's iterator must be closed to decrement reference count of the block it points to. Conflicts: bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9d8d9fd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9d8d9fd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9d8d9fd7 Branch: refs/heads/master Commit: 9d8d9fd75733eb5b437f578fa3f3525e7f5c87dd Parents: b487b3b Author: Vlad Rozov Authored: Mon Mar 7 18:30:12 2016 -0800 Committer: Thomas Weise Committed: Tue Mar 8 11:11:10 2016 -0800 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 57 ++++++-------------- .../bufferserver/internal/LogicalNode.java | 35 +++++++----- .../datatorrent/bufferserver/server/Server.java | 7 +-- 3 files changed, 39 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9d8d9fd7/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 95c32b0..2a01102 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -61,7 +60,6 @@ public class DataList private final int blockSize; private final HashMap> listeners = newHashMap(); protected final HashSet all_listeners = newHashSet(); - protected final HashMap iterators = newHashMap(); protected Block first; protected Block last; protected Storage storage; @@ -332,43 +330,18 @@ public class DataList return block.next; } - public Iterator newIterator(String identifier, long windowId) + public DataListIterator newIterator(long windowId) { //logger.debug("request for a new iterator {} and {}", identifier, windowId); - for (Block temp = first; temp != null; temp = temp.next) { + Block temp = first; + while (temp != last) { if (temp.starting_window >= windowId || temp.ending_window > windowId) { - DataListIterator dli = getIterator(temp); - iterators.put(identifier, dli); - //logger.debug("returning new iterator on temp = {}", temp); - return dli; - } - } - - DataListIterator dli = getIterator(last); - iterators.put(identifier, dli); - //logger.debug("returning new iterator on last = {}", last); - return dli; - } - - /** - * Release previous acquired iterator from this DataList - * - * @param iterator - * @return true if successfully released, false otherwise. - */ - public boolean delIterator(Iterator iterator) - { - if (iterator instanceof DataListIterator) { - DataListIterator dli = (DataListIterator)iterator; - for (Entry e : iterators.entrySet()) { - if (e.getValue() == dli) { - dli.close(); - iterators.remove(e.getKey()); - return true; - } + break; } + temp = temp.next; } - return false; + //logger.debug("returning new iterator on temp = {}", temp); + return getIterator(temp); } public void addDataListener(DataListener dl) @@ -506,19 +479,21 @@ public class DataList int oldestBlockIndex = Integer.MAX_VALUE; int oldestReadOffset = Integer.MAX_VALUE; - for (Map.Entry entry : iterators.entrySet()) { - Integer index = indices.get(entry.getValue().da); + for (DataListener dl : all_listeners) { + LogicalNode logicalNode = (LogicalNode)dl; + DataListIterator dli = logicalNode.getIterator(); + Integer index = indices.get(dli.da); if (index == null) { // error throw new RuntimeException("problemo!"); } if (index < oldestBlockIndex) { oldestBlockIndex = index; - oldestReadOffset = entry.getValue().getReadOffset(); - status.slowestConsumer = entry.getKey(); - } else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) { - oldestReadOffset = entry.getValue().getReadOffset(); - status.slowestConsumer = entry.getKey(); + oldestReadOffset = dli.getReadOffset(); + status.slowestConsumer = logicalNode.getIdentifier(); + } else if (index == oldestBlockIndex && dli.getReadOffset() < oldestReadOffset) { + oldestReadOffset = dli.getReadOffset(); + status.slowestConsumer = logicalNode.getIdentifier(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9d8d9fd7/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 761bbea..ab76d01 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -20,7 +20,6 @@ package com.datatorrent.bufferserver.internal; import java.util.Collection; import java.util.HashSet; -import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +46,7 @@ import com.datatorrent.netlet.EventLoop; */ public class LogicalNode implements DataListener { + private final String identifier; private final String upstream; private final String group; private final HashSet physicalNodes; @@ -59,25 +59,21 @@ public class LogicalNode implements DataListener /** * + * @param identifier * @param upstream * @param group * @param iterator - * @param skipUptoWindowId + * @param skipWindowId */ - public LogicalNode(String upstream, String group, Iterator iterator, long skipUptoWindowId) + public LogicalNode(String identifier, String upstream, String group, DataListIterator iterator, long skipWindowId) { + this.identifier = identifier; this.upstream = upstream; this.group = group; this.physicalNodes = new HashSet(); this.partitions = new HashSet(); - - if (iterator instanceof DataListIterator) { - this.iterator = (DataListIterator)iterator; - } else { - throw new IllegalArgumentException("iterator does not belong to DataListIterator class"); - } - - skipWindowId = skipUptoWindowId; + this.iterator = iterator; + this.skipWindowId = skipWindowId; } /** @@ -91,13 +87,14 @@ public class LogicalNode implements DataListener /** * - * @return Iterator + * @return DataListIterator */ - public Iterator getIterator() + public DataListIterator getIterator() { return iterator; } + /** * * @param connection @@ -334,6 +331,15 @@ public class LogicalNode implements DataListener return upstream; } + /** + * + * @return the identifier + */ + public String getIdentifier() + { + return identifier; + } + public void boot(EventLoop eventloop) { for (PhysicalNode pn : physicalNodes) { @@ -345,7 +351,8 @@ public class LogicalNode implements DataListener @Override public String toString() { - return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + + return "LogicalNode@" + Integer.toHexString(hashCode()) + + "identifier=" + identifier + ", upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}'; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9d8d9fd7/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 353eb2b..8a1fac7 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -271,10 +271,7 @@ public class Server implements ServerListener } long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); - ln = new LogicalNode(upstream_identifier, - type, - dl.newIterator(identifier, skipWindowId), - skipWindowId); + ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId), skipWindowId); int mask = request.getMask(); if (mask != 0) { @@ -584,10 +581,10 @@ public class Server implements ServerListener DataList dl = publisherBuffers.get(ln.getUpstream()); if (dl != null) { dl.removeDataListener(ln); - dl.delIterator(ln.getIterator()); } subscriberGroups.remove(ln.getGroup()); } + ln.getIterator().close(); } }