Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44A0B1999B for ; Fri, 15 Apr 2016 03:07:41 +0000 (UTC) Received: (qmail 43734 invoked by uid 500); 15 Apr 2016 03:07:40 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 43554 invoked by uid 500); 15 Apr 2016 03:07:40 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 43514 invoked by uid 99); 15 Apr 2016 03:07:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Apr 2016 03:07:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 251AAC0D64 for ; Fri, 15 Apr 2016 03:07:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id azAox61df_jD for ; Fri, 15 Apr 2016 03:07:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 01D9260F40 for ; Fri, 15 Apr 2016 03:07:31 +0000 (UTC) Received: (qmail 43422 invoked by uid 99); 15 Apr 2016 03:07:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Apr 2016 03:07:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C1F1E0615; Fri, 15 Apr 2016 03:07:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jinmeiliao@apache.org To: commits@geode.incubator.apache.org Date: Fri, 15 Apr 2016 03:07:36 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/32] incubator-geode git commit: GEODE-1199: fix off-heap leak in netWrite GEODE-1199: fix off-heap leak in netWrite Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/80533ba8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/80533ba8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/80533ba8 Branch: refs/heads/feature/GEODE-17-2 Commit: 80533ba8503e8fa04aed3291b2d4efbc29e61535 Parents: fbee35c Author: Darrel Schneider Authored: Thu Apr 7 16:20:18 2016 -0700 Committer: Darrel Schneider Committed: Wed Apr 13 10:16:20 2016 -0700 ---------------------------------------------------------------------- .../cache/SearchLoadAndWriteProcessor.java | 28 ++++++-- .../cache/SearchLoadAndWriteProcessorTest.java | 68 ++++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java index 8224fc2..d9729a7 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java @@ -74,6 +74,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.internal.offheap.annotations.Released; +import com.gemstone.gemfire.internal.offheap.annotations.Retained; /** @@ -211,7 +213,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { int action = paction; this.requestInProgress = true; - Scope scope = this.region.scope; + Scope scope = this.region.getScope(); if (localWriter != null) { doLocalWrite(localWriter, event, action); this.requestInProgress = false; @@ -220,14 +222,22 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) { return false; } + @Released CacheEvent listenerEvent = getEventForListener(event); + try { if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) { action = BEFORECREATE; } - boolean cacheWrote = netWrite(getEventForListener(event), action, netWriteRecipients); + boolean cacheWrote = netWrite(listenerEvent, action, netWriteRecipients); this.requestInProgress = false; return cacheWrote; - + } finally { + if (event != listenerEvent) { + if (listenerEvent instanceof EntryEventImpl) { + ((EntryEventImpl) listenerEvent).release(); + } + } + } } public void memberJoined(InternalDistributedMember id) { @@ -810,16 +820,20 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** * Returns an event for listener notification. The event's operation - * may be altered to conform to the ConcurrentMap implementation specification + * may be altered to conform to the ConcurrentMap implementation specification. + * If the returned value is not == to the event parameter then the caller + * is responsible for releasing it. * @param event the original event * @return the original event or a new event having a change in operation */ + @Retained private CacheEvent getEventForListener(CacheEvent event) { Operation op = event.getOperation(); if (!op.isEntry()) { return event; } else { EntryEventImpl r = (EntryEventImpl)event; + @Retained EntryEventImpl result = r; if (r.isSingleHop()) { // fix for bug #46130 - origin remote incorrect for one-hop operation in receiver @@ -856,6 +870,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { return false; } } + @Released CacheEvent event = getEventForListener(pevent); int action = paction; @@ -898,8 +913,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** Return true if cache writer was invoked */ private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet) throws CacheWriterException, TimeoutException { - - // assert !writeCandidateSet.isEmpty(); + if (writeCandidateSet == null || writeCandidateSet.isEmpty()) { + return false; + } ArrayList list = new ArrayList(writeCandidateSet); Collections.shuffle(list); InternalDistributedMember[] writeCandidates = (InternalDistributedMember[])list. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java new file mode 100644 index 0000000..57aca37 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.internal.offheap.StoredObject; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class SearchLoadAndWriteProcessorTest { + + /** + * This test verifies the fix for GEODE-1199. + * It verifies that when doNetWrite is called with an event + * that has a StoredObject value that it will have "release" + * called on it. + */ + @Test + public void verifyThatOffHeapReleaseIsCalledAfterNetWrite() { + // setup + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + LocalRegion lr = mock(LocalRegion.class); + when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK); + Object key = "key"; + StoredObject value = mock(StoredObject.class); + when(value.hasRefCount()).thenReturn(true); + when(value.retain()).thenReturn(true); + Object cbArg = null; + KeyInfo keyInfo = new KeyInfo(key, value, cbArg); + when(lr.getKeyInfo(any(), any(), any())).thenReturn(keyInfo); + processor.region = lr; + EntryEventImpl event = EntryEventImpl.create(lr, Operation.REPLACE, key, value, cbArg, false, null); + + try { + // the test + processor.doNetWrite(event, null, null, 0); + + // verification + verify(value, times(2)).retain(); + verify(value, times(1)).release(); + + } finally { + processor.release(); + } + } + +}