From commits-return-26906-archive-asf-public=cust-asf.ponee.io@geode.apache.org Wed May 2 19:58:24 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0883B18065D for ; Wed, 2 May 2018 19:58:21 +0200 (CEST) Received: (qmail 73580 invoked by uid 500); 2 May 2018 17:58:21 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 73570 invoked by uid 99); 2 May 2018 17:58:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 May 2018 17:58:21 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4417380F89; Wed, 2 May 2018 17:58:20 +0000 (UTC) Date: Wed, 02 May 2018 17:58:19 +0000 To: "commits@geode.apache.org" Subject: [geode] branch develop updated: GEODE-845: Rename and fixup TXExpirationIntegrationTest MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152528389957.26309.4566113830358229111@gitbox.apache.org> From: klund@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Oldrev: baedf7941c56fccb66aba231ff3c1a5e2ec86330 X-Git-Newrev: af54b49b8f73514dc239fc59f2f15982f3cb3b4d X-Git-Rev: af54b49b8f73514dc239fc59f2f15982f3cb3b4d X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git The following commit(s) were added to refs/heads/develop by this push: new af54b49 GEODE-845: Rename and fixup TXExpirationIntegrationTest af54b49 is described below commit af54b49b8f73514dc239fc59f2f15982f3cb3b4d Author: Kirk Lund AuthorDate: Tue May 1 15:43:29 2018 -0700 GEODE-845: Rename and fixup TXExpirationIntegrationTest * rename TXExpiryJUnitTest to TXExpirationIntegrationTest * fix flakiness in TXExpirationIntegrationTest --- .../geode/internal/cache/InternalRegion.java | 4 + .../apache/geode/internal/cache/LocalRegion.java | 2 + .../java/org/apache/geode/ExpirationDetector.java | 86 +++++ .../apache/geode/TXExpirationIntegrationTest.java | 388 +++++++++++++++++++ .../java/org/apache/geode/TXExpiryJUnitTest.java | 424 --------------------- ...=> DistributedTXExpirationIntegrationTest.java} | 32 +- .../internal/cache/RemoteTransactionDUnitTest.java | 188 ++++++++- 7 files changed, 674 insertions(+), 450 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java index fb9fe1c..7633713 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java @@ -375,4 +375,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo Object getIMSync(); IndexManager setIndexManager(IndexManager idxMgr); + + RegionTTLExpiryTask getRegionTTLExpiryTask(); + + RegionIdleExpiryTask getRegionIdleExpiryTask(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index bd909e1..88f6359 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -8094,6 +8094,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * Used by unit tests to get access to the RegionIdleExpiryTask of this region. Returns null if no * task exists. */ + @Override public RegionIdleExpiryTask getRegionIdleExpiryTask() { return this.regionIdleExpiryTask; } @@ -8102,6 +8103,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * Used by unit tests to get access to the RegionTTLExpiryTask of this region. Returns null if no * task exists. */ + @Override public RegionTTLExpiryTask getRegionTTLExpiryTask() { return this.regionTTLExpiryTask; } diff --git a/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java b/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java new file mode 100644 index 0000000..0eb926b --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.concurrent.TimeUnit; + +import org.apache.geode.internal.cache.ExpiryTask; +import org.apache.geode.internal.cache.ExpiryTask.ExpiryTaskListener; + +/** + * Used to detect that a particular ExpiryTask has expired. + */ +public class ExpirationDetector implements ExpiryTaskListener { + + private volatile boolean executed; + private volatile boolean expired; + private volatile boolean rescheduled; + + private final ExpiryTask expiryTask; + + public ExpirationDetector(ExpiryTask expiry) { + assertThat(expiry).isNotNull(); + expiryTask = expiry; + } + + @Override + public void afterCancel(ExpiryTask expiryTask) { + // nothing + } + + @Override + public void afterSchedule(ExpiryTask expiryTask) { + // nothing + } + + @Override + public void afterReschedule(ExpiryTask expiryTask) { + if (expiryTask == this.expiryTask) { + if (!hasExpired()) { + ExpiryTask.suspendExpiration(); + } + rescheduled = true; + } + } + + @Override + public void afterExpire(ExpiryTask expiryTask) { + if (expiryTask == this.expiryTask) { + expired = true; + } + } + + @Override + public void afterTaskRan(ExpiryTask expiryTask) { + if (expiryTask == this.expiryTask) { + executed = true; + } + } + + public void awaitExecuted(long timeout, TimeUnit unit) { + await().atMost(timeout, unit).until(() -> executed); + } + + public boolean wasRescheduled() { + return rescheduled; + } + + public boolean hasExpired() { + return expired; + } +} diff --git a/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java new file mode 100644 index 0000000..aacd6ae --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.cache.ExpirationAction.DESTROY; +import static org.apache.geode.cache.ExpirationAction.INVALIDATE; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.Properties; +import java.util.function.Consumer; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.AttributesMutator; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.CacheTransactionManager; +import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.internal.cache.ExpiryTask; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.InternalRegion; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxy; +import org.apache.geode.test.junit.categories.IntegrationTest; + +/** + * Tests transaction expiration functionality + * + * @since GemFire 4.0 + */ +@Category(IntegrationTest.class) +@RunWith(JUnitParamsRunner.class) +public class TXExpirationIntegrationTest { + + private static final String REGION_NAME = + TXExpirationIntegrationTest.class.getSimpleName() + "_region"; + private static final String KEY = "key"; + + private InternalCache cache; + private CacheTransactionManager transactionManager; + + @Before + public void setUp() throws Exception { + cache = (InternalCache) new CacheFactory(getConfig()).create(); + transactionManager = cache.getCacheTransactionManager(); + } + + protected Properties getConfig() { + Properties config = new Properties(); + config.setProperty(LOCATORS, ""); + config.setProperty(MCAST_PORT, "0"); + return config; + } + + @After + public void tearDown() throws Exception { + try { + transactionManager.rollback(); + } catch (IllegalStateException ignore) { + } + cache.close(); + + ExpiryTask.permitExpiration(); + } + + @Test + @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY", + "ENTRY_TTL_INVALIDATE"}) + @TestCaseName("{method}({params})") + public void entryExpirationDoesNotCauseConflict(ExpirationOperation operation) throws Exception { + InternalRegion region = createRegion(REGION_NAME); + AttributesMutator mutator = region.getAttributesMutator(); + + KeyCacheListener keyCacheListener = spy(new KeyCacheListener()); + mutator.addCacheListener(keyCacheListener); + + ExpiryTask.suspendExpiration(); + operation.mutate(mutator); + + region.put(KEY, "value1"); + + transactionManager.begin(); + region.put(KEY, "value2"); + awaitEntryExpiration(region, KEY); // enables expiration + assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2"); + + ExpiryTask.suspendExpiration(); + transactionManager.commit(); + + assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2"); + awaitEntryExpiration(region, KEY); // enables expiration + operation.verifyInvoked(keyCacheListener); + + operation.verifyState(region); + } + + @Test + @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY", + "ENTRY_TTL_INVALIDATE"}) + @TestCaseName("{method}({params})") + public void entryExpirationContinuesAfterCommitConflict(ExpirationOperation operation) + throws Exception { + Region region = createRegion(REGION_NAME); + + AttributesMutator mutator = region.getAttributesMutator(); + + KeyCacheListener keyCacheListener = spy(new KeyCacheListener()); + mutator.addCacheListener(keyCacheListener); + + ExpiryTask.suspendExpiration(); + operation.mutate(mutator); + + ExpiryTask.suspendExpiration(); + region.put(KEY, "value1"); + + transactionManager.begin(); + region.put(KEY, "value2"); + awaitEntryExpiration(region, KEY); // enables expiration + assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2"); + + ExpiryTask.suspendExpiration(); + + TXManagerImpl txManagerImpl = (TXManagerImpl) transactionManager; + TXStateProxy txStateProxy = txManagerImpl.pauseTransaction(); + region.put(KEY, "conflict"); + txManagerImpl.unpauseTransaction(txStateProxy); + + assertThatThrownBy(() -> transactionManager.commit()) + .isInstanceOf(CommitConflictException.class); + + awaitEntryExpiration(region, KEY); // enables expiration + operation.verifyInvoked(keyCacheListener); + operation.verifyState(region); + } + + @Test + @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY", + "ENTRY_TTL_INVALIDATE"}) + @TestCaseName("{method}({params})") + public void entryExpirationContinuesAfterRollback(ExpirationOperation operation) + throws Exception { + Region region = createRegion(REGION_NAME); + + AttributesMutator mutator = region.getAttributesMutator(); + + KeyCacheListener keyCacheListener = spy(new KeyCacheListener()); + mutator.addCacheListener(keyCacheListener); + + ExpiryTask.suspendExpiration(); + operation.mutate(mutator); + + region.put(KEY, "value1"); + + transactionManager.begin(); + region.put(KEY, "value2"); + awaitEntryExpiration(region, KEY); // enables expiration + assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2"); + + ExpiryTask.suspendExpiration(); + + transactionManager.rollback(); + + awaitEntryExpiration(region, KEY); // enables expiration + operation.verifyInvoked(keyCacheListener); + operation.verifyState(region); + } + + @Test + @Parameters({"REGION_IDLE_DESTROY", "REGION_IDLE_INVALIDATE", "REGION_TTL_DESTROY", + "REGION_TTL_INVALIDATE"}) + @TestCaseName("{method}({params})") + public void regionExpirationDoesNotCauseConflict(ExpirationOperation operation) throws Exception { + Region region = createRegion(REGION_NAME); + + KeyCacheListener keyCacheListener = spy(new KeyCacheListener()); + + AttributesMutator mutator = region.getAttributesMutator(); + mutator.addCacheListener(keyCacheListener); + + ExpiryTask.suspendExpiration(); + operation.mutate(mutator); + + transactionManager.begin(); + region.put(KEY, "value1"); + + awaitRegionExpiration(region); // enables expiration + assertThat(region.getEntry(KEY).getValue()).isEqualTo("value1"); + + ExpiryTask.suspendExpiration(); + transactionManager.commit(); + + assertThat(region.getEntry(KEY).getValue()).isEqualTo("value1"); + + awaitRegionExpiration(region); // enables expiration + operation.verifyInvoked(keyCacheListener); + operation.verifyState(region); + } + + private enum ExpirationOperation { + ENTRY_IDLE_DESTROY(m -> m.setEntryIdleTimeout(new ExpirationAttributes(1, DESTROY)), + s -> verify(s, atLeast(1)).afterDestroyKey(eq(KEY)), + r -> assertThat(r.containsKey(KEY)).isFalse()), + ENTRY_IDLE_INVALIDATE(m -> m.setEntryIdleTimeout(new ExpirationAttributes(1, INVALIDATE)), + s -> verify(s, atLeast(1)).afterInvalidateKey(eq(KEY)), + r -> assertThat(r.get(KEY)).isNull()), + ENTRY_TTL_DESTROY(m -> m.setEntryTimeToLive(new ExpirationAttributes(1, DESTROY)), + s -> verify(s, atLeast(1)).afterDestroyKey(eq(KEY)), + r -> assertThat(r.containsKey(KEY)).isFalse()), + ENTRY_TTL_INVALIDATE(m -> m.setEntryTimeToLive(new ExpirationAttributes(1, INVALIDATE)), + s -> verify(s, atLeast(1)).afterInvalidateKey(eq(KEY)), + r -> assertThat(r.get(KEY)).isNull()), + REGION_IDLE_DESTROY(m -> m.setRegionIdleTimeout(new ExpirationAttributes(1, DESTROY)), + s -> verify(s, atLeast(1)).afterRegionDestroyName(eq(REGION_NAME)), + r -> await().atMost(1, MINUTES).until(() -> r.isDestroyed())), + REGION_IDLE_INVALIDATE(m -> m.setRegionIdleTimeout(new ExpirationAttributes(1, INVALIDATE)), + s -> verify(s, atLeast(1)).afterRegionInvalidateName(eq(REGION_NAME)), + r -> await().atMost(1, MINUTES).until(() -> r.get(KEY) == null)), + REGION_TTL_DESTROY(m -> m.setRegionTimeToLive(new ExpirationAttributes(1, DESTROY)), + s -> verify(s, atLeast(1)).afterRegionDestroyName(eq(REGION_NAME)), + r -> await().atMost(1, MINUTES).until(() -> r.isDestroyed())), + REGION_TTL_INVALIDATE(m -> m.setRegionTimeToLive(new ExpirationAttributes(1, INVALIDATE)), + s -> verify(s, atLeast(1)).afterRegionInvalidateName(eq(REGION_NAME)), + r -> await().atMost(1, MINUTES).until(() -> r.get(KEY) == null)); + + private final Consumer mutatorConsumer; + private final Consumer listenerConsumer; + private final Consumer regionConsumer; + + ExpirationOperation(Consumer mutatorConsumer, + Consumer listenerConsumer, Consumer regionConsumer) { + this.mutatorConsumer = mutatorConsumer; + this.listenerConsumer = listenerConsumer; + this.regionConsumer = regionConsumer; + } + + void mutate(AttributesMutator mutator) { + mutatorConsumer.accept(mutator); + } + + void verifyInvoked(KeyCacheListener keyCacheListener) { + listenerConsumer.accept(keyCacheListener); + } + + public void verifyState(Region region) { + regionConsumer.accept(region); + } + } + + private InternalRegion createRegion(String name) { + RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + rf.setStatisticsEnabled(true); + + System.setProperty(LocalRegion.EXPIRY_MS_PROPERTY, "true"); + try { + return (InternalRegion) rf.create(name); + } finally { + System.clearProperty(LocalRegion.EXPIRY_MS_PROPERTY); + } + } + + private void awaitEntryExpired(Region region, String key) { + await().atMost(1, MINUTES).until(() -> region.getEntry(key) == null || region.get(key) == null); + } + + private void awaitEntryExpiration(Region region, String key) { + InternalRegion internalRegion = (InternalRegion) region; + try { + ExpirationDetector detector; + do { + detector = new ExpirationDetector(internalRegion.getEntryExpiryTask(key)); + ExpiryTask.expiryTaskListener = detector; + ExpiryTask.permitExpiration(); + detector.awaitExecuted(30, SECONDS); + } while (!detector.hasExpired() && detector.wasRescheduled()); + } finally { + ExpiryTask.expiryTaskListener = null; + } + } + + private void awaitRegionExpiration(Region region) { + InternalRegion internalRegion = (InternalRegion) region; + try { + ExpirationDetector detector; + do { + detector = new ExpirationDetector(internalRegion.getRegionTTLExpiryTask() != null + ? internalRegion.getRegionTTLExpiryTask() : internalRegion.getRegionIdleExpiryTask()); + ExpiryTask.expiryTaskListener = detector; + ExpiryTask.permitExpiration(); + detector.awaitExecuted(30, SECONDS); + } while (!detector.hasExpired() && detector.wasRescheduled()); + } finally { + ExpiryTask.expiryTaskListener = null; + } + } + + private static class KeyCacheListener extends CacheListenerAdapter { + + @Override + public void afterCreate(EntryEvent event) { + afterCreateKey(event.getKey()); + } + + @Override + public void afterUpdate(EntryEvent event) { + afterUpdateKey(event.getKey()); + } + + @Override + public void afterInvalidate(EntryEvent event) { + afterInvalidateKey(event.getKey()); + } + + @Override + public void afterDestroy(EntryEvent event) { + afterDestroyKey(event.getKey()); + } + + @Override + public void afterRegionInvalidate(RegionEvent event) { + afterRegionInvalidateName(event.getRegion().getName()); + } + + @Override + public void afterRegionDestroy(RegionEvent event) { + afterRegionDestroyName(event.getRegion().getName()); + } + + public void afterCreateKey(Object key) { + // nothing + } + + public void afterUpdateKey(Object key) { + // nothing + } + + public void afterInvalidateKey(Object key) { + // nothing + } + + public void afterDestroyKey(Object key) { + // nothing + } + + public void afterRegionInvalidateName(Object key) { + // nothing + } + + public void afterRegionDestroyName(Object key) { + // nothing + } + } +} diff --git a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java deleted file mode 100644 index def0076..0000000 --- a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode; - -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.junit.Assert.*; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.cache.*; -import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.internal.cache.*; -import org.apache.geode.internal.cache.ExpiryTask.ExpiryTaskListener; -import org.apache.geode.test.dunit.Assert; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.junit.categories.FlakyTest; -import org.apache.geode.test.junit.categories.IntegrationTest; - -/** - * Tests transaction expiration functionality - * - * @since GemFire 4.0 - */ -@Category(IntegrationTest.class) -public class TXExpiryJUnitTest { - - protected GemFireCacheImpl cache; - protected CacheTransactionManager txMgr; - - protected void createCache() throws CacheException { - Properties p = new Properties(); - p.setProperty(MCAST_PORT, "0"); // loner - this.cache = (GemFireCacheImpl) (new CacheFactory(p)).create(); - this.txMgr = this.cache.getCacheTransactionManager(); - } - - private void closeCache() { - if (this.cache != null) { - if (this.txMgr != null) { - try { - this.txMgr.rollback(); - } catch (IllegalStateException ignore) { - } - } - this.txMgr = null; - Cache c = this.cache; - this.cache = null; - c.close(); - } - } - - @Before - public void setUp() throws Exception { - createCache(); - } - - @After - public void tearDown() throws Exception { - closeCache(); - } - - @Test - public void testEntryTTLExpiration() throws CacheException { - generalEntryExpirationTest(createRegion("TXEntryTTL"), - new ExpirationAttributes(1, ExpirationAction.DESTROY), true); - } - - @Test - public void testEntryIdleExpiration() throws CacheException { - generalEntryExpirationTest(createRegion("TXEntryIdle"), - new ExpirationAttributes(1, ExpirationAction.DESTROY), false); - } - - private Region createRegion(String name) { - RegionFactory rf = this.cache.createRegionFactory(); - rf.setScope(Scope.DISTRIBUTED_NO_ACK); - rf.setStatisticsEnabled(true); - System.setProperty(LocalRegion.EXPIRY_MS_PROPERTY, "true"); - try { - return rf.create(name); - } finally { - System.getProperties().remove(LocalRegion.EXPIRY_MS_PROPERTY); - } - } - - public void generalEntryExpirationTest(final Region exprReg, - ExpirationAttributes exprAtt, boolean useTTL) throws CacheException { - final LocalRegion lr = (LocalRegion) exprReg; - final boolean wasDestroyed[] = {false}; - AttributesMutator mutator = exprReg.getAttributesMutator(); - final AtomicInteger ac = new AtomicInteger(); - final AtomicInteger au = new AtomicInteger(); - final AtomicInteger ai = new AtomicInteger(); - final AtomicInteger ad = new AtomicInteger(); - - if (useTTL) { - mutator.setEntryTimeToLive(exprAtt); - } else { - mutator.setEntryIdleTimeout(exprAtt); - } - final CacheListener cl = new CacheListenerAdapter() { - public void afterCreate(EntryEvent e) { - ac.incrementAndGet(); - } - - public void afterUpdate(EntryEvent e) { - au.incrementAndGet(); - } - - public void afterInvalidate(EntryEvent e) { - ai.incrementAndGet(); - } - - public void afterDestroy(EntryEvent e) { - ad.incrementAndGet(); - if (e.getKey().equals("key0")) { - synchronized (wasDestroyed) { - wasDestroyed[0] = true; - wasDestroyed.notifyAll(); - } - } - } - - public void afterRegionInvalidate(RegionEvent event) { - fail("Unexpected invocation of afterRegionInvalidate"); - } - - public void afterRegionDestroy(RegionEvent event) { - if (!event.getOperation().isClose()) { - fail("Unexpected invocation of afterRegionDestroy"); - } - } - }; - mutator.addCacheListener(cl); - try { - - ExpiryTask.suspendExpiration(); - // Test to ensure an expiration does not cause a conflict - for (int i = 0; i < 2; i++) { - exprReg.put("key" + i, "value" + i); - } - this.txMgr.begin(); - exprReg.put("key0", "value"); - waitForEntryExpiration(lr, "key0"); - assertEquals("value", exprReg.getEntry("key0").getValue()); - try { - ExpiryTask.suspendExpiration(); - this.txMgr.commit(); - } catch (CommitConflictException error) { - fail("Expiration should not cause commit to fail"); - } - assertEquals("value", exprReg.getEntry("key0").getValue()); - waitForEntryExpiration(lr, "key0"); - synchronized (wasDestroyed) { - assertEquals(true, wasDestroyed[0]); - } - assertTrue(!exprReg.containsKey("key0")); - // key1 is the canary for the rest of the entries - waitForEntryToBeDestroyed(exprReg, "key1"); - - // rollback and failed commit test, ensure expiration continues - for (int j = 0; j < 2; j++) { - synchronized (wasDestroyed) { - wasDestroyed[0] = false; - } - ExpiryTask.suspendExpiration(); - for (int i = 0; i < 2; i++) { - exprReg.put("key" + i, "value" + i); - } - this.txMgr.begin(); - exprReg.put("key0", "value"); - waitForEntryExpiration(lr, "key0"); - assertEquals("value", exprReg.getEntry("key0").getValue()); - String checkVal; - ExpiryTask.suspendExpiration(); - if (j == 0) { - checkVal = "value0"; - this.txMgr.rollback(); - } else { - checkVal = "conflictVal"; - final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr; - TXStateProxy tx = txMgrImpl.pauseTransaction(); - exprReg.put("key0", checkVal); - txMgrImpl.unpauseTransaction(tx); - try { - this.txMgr.commit(); - fail("Expected CommitConflictException!"); - } catch (CommitConflictException expected) { - } - } - waitForEntryExpiration(lr, "key0"); - synchronized (wasDestroyed) { - assertEquals(true, wasDestroyed[0]); - } - assertTrue(!exprReg.containsKey("key0")); - // key1 is the canary for the rest of the entries - waitForEntryToBeDestroyed(exprReg, "key1"); - } - } finally { - mutator.removeCacheListener(cl); - ExpiryTask.permitExpiration(); - } - } - - private void waitForEntryToBeDestroyed(final Region r, final String key) { - WaitCriterion waitForExpire = new WaitCriterion() { - public boolean done() { - return r.getEntry(key) == null; - } - - public String description() { - return "never saw entry destroy of " + key; - } - }; - Wait.waitForCriterion(waitForExpire, 3000, 10, true); - } - - public static void waitForEntryExpiration(LocalRegion lr, String key) { - try { - ExpirationDetector detector; - do { - detector = new ExpirationDetector(lr.getEntryExpiryTask(key)); - ExpiryTask.expiryTaskListener = detector; - ExpiryTask.permitExpiration(); - Wait.waitForCriterion(detector, 3000, 2, true); - } while (!detector.hasExpired() && detector.wasRescheduled()); - } finally { - ExpiryTask.expiryTaskListener = null; - } - } - - private void waitForRegionExpiration(LocalRegion lr, boolean ttl) { - try { - ExpirationDetector detector; - do { - detector = new ExpirationDetector( - ttl ? lr.getRegionTTLExpiryTask() : lr.getRegionIdleExpiryTask()); - ExpiryTask.expiryTaskListener = detector; - ExpiryTask.permitExpiration(); - Wait.waitForCriterion(detector, 3000, 2, true); - } while (!detector.hasExpired() && detector.wasRescheduled()); - } finally { - ExpiryTask.expiryTaskListener = null; - } - } - - - /** - * Used to detect that a particular ExpiryTask has expired. - */ - public static class ExpirationDetector implements ExpiryTaskListener, WaitCriterion { - private volatile boolean ran = false; - private volatile boolean expired = false; - private volatile boolean rescheduled = false; - public final ExpiryTask et; - - public ExpirationDetector(ExpiryTask et) { - assertNotNull(et); - this.et = et; - } - - @Override - public void afterCancel(ExpiryTask et) {} - - @Override - public void afterSchedule(ExpiryTask et) {} - - @Override - public void afterReschedule(ExpiryTask et) { - if (et == this.et) { - if (!hasExpired()) { - ExpiryTask.suspendExpiration(); - } - this.rescheduled = true; - } - } - - @Override - public void afterExpire(ExpiryTask et) { - if (et == this.et) { - this.expired = true; - } - } - - @Override - public void afterTaskRan(ExpiryTask et) { - if (et == this.et) { - this.ran = true; - } - } - - @Override - public boolean done() { - return this.ran; - } - - @Override - public String description() { - return "the expiry task " + this.et + " never ran"; - } - - public boolean wasRescheduled() { - return this.rescheduled; - } - - public boolean hasExpired() { - return this.expired; - } - } - - @Category(FlakyTest.class) // GEODE-845: time sensitive, expiration, eats exceptions (1 fixed), - // waitForCriterion, 3 second timeout - @Test - public void testRegionIdleExpiration() throws CacheException { - Region exprReg = createRegion("TXRegionIdle"); - generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.INVALIDATE), - false); - generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.DESTROY), - false); - } - - @Test - public void testRegionTTLExpiration() throws CacheException { - Region exprReg = createRegion("TXRegionTTL"); - generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.INVALIDATE), - true); - generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.DESTROY), - true); - } - - private void generalRegionExpirationTest(final Region exprReg, - ExpirationAttributes exprAtt, boolean useTTL) throws CacheException { - final LocalRegion lr = (LocalRegion) exprReg; - final ExpirationAction action = exprAtt.getAction(); - final boolean regionExpiry[] = {false}; - AttributesMutator mutator = exprReg.getAttributesMutator(); - final CacheListener cl = new CacheListenerAdapter() { - public void afterRegionInvalidate(RegionEvent event) { - synchronized (regionExpiry) { - regionExpiry[0] = true; - regionExpiry.notifyAll(); - } - } - - public void afterRegionDestroy(RegionEvent event) { - if (!event.getOperation().isClose()) { - synchronized (regionExpiry) { - regionExpiry[0] = true; - regionExpiry.notifyAll(); - } - } - } - }; - mutator.addCacheListener(cl); - // Suspend before enabling region expiration to prevent - // it from happening before we do the put. - ExpiryTask.suspendExpiration(); - try { - if (useTTL) { - mutator.setRegionTimeToLive(exprAtt); - } else { - mutator.setRegionIdleTimeout(exprAtt); - } - - // Create some keys and age them, I wish we could fake/force the age - // instead of having to actually wait - for (int i = 0; i < 2; i++) { - exprReg.put("key" + i, "value" + i); - } - - String regName = exprReg.getName(); - // Test to ensure a region expiration does not cause a conflict - this.txMgr.begin(); - exprReg.put("key0", "value"); - waitForRegionExpiration(lr, useTTL); - assertEquals("value", exprReg.getEntry("key0").getValue()); - try { - ExpiryTask.suspendExpiration(); - this.txMgr.commit(); - } catch (CommitConflictException error) { - Assert.fail("Expiration should not cause commit to fail", error); - } - assertEquals("value", exprReg.getEntry("key0").getValue()); - waitForRegionExpiration(lr, useTTL); - synchronized (regionExpiry) { - assertEquals(true, regionExpiry[0]); - } - if (action == ExpirationAction.DESTROY) { - assertNull("listener saw Region expiration, expected a destroy operation!", - this.cache.getRegion(regName)); - } else { - assertTrue("listener saw Region expiration, expected invalidation", - !exprReg.containsValueForKey("key0")); - } - - } finally { - if (!exprReg.isDestroyed()) { - mutator.removeCacheListener(cl); - } - ExpiryTask.permitExpiration(); - } - - // @todo mitch test rollback and failed expiration - } -} diff --git a/geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java similarity index 51% rename from geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java rename to geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java index 2948c3d..67c3ee6 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java @@ -14,42 +14,26 @@ */ package org.apache.geode.disttx; -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_TRANSACTIONS; import java.util.Properties; import org.junit.experimental.categories.Category; -import org.apache.geode.TXExpiryJUnitTest; -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.CacheException; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Scope; -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.TXExpirationIntegrationTest; import org.apache.geode.test.junit.categories.DistributedTransactionsTest; import org.apache.geode.test.junit.categories.IntegrationTest; /** - * Same tests as that of {@link TXExpiryJUnitTest} after setting "distributed-transactions" property - * to true + * Extends {@link TXExpirationIntegrationTest} with "distributed-transactions" enabled. */ @Category({IntegrationTest.class, DistributedTransactionsTest.class}) -public class DistTXExpiryJUnitTest extends TXExpiryJUnitTest { - - public DistTXExpiryJUnitTest() {} +public class DistributedTXExpirationIntegrationTest extends TXExpirationIntegrationTest { @Override - protected void createCache() throws CacheException { - Properties p = new Properties(); - p.setProperty(MCAST_PORT, "0"); // loner - p.setProperty(ConfigurationProperties.DISTRIBUTED_TRANSACTIONS, "true"); - this.cache = (GemFireCacheImpl) CacheFactory.create(DistributedSystem.connect(p)); - AttributesFactory af = new AttributesFactory(); - af.setScope(Scope.DISTRIBUTED_NO_ACK); - this.txMgr = this.cache.getCacheTransactionManager(); - assert (this.txMgr.isDistributed()); + protected Properties getConfig() { + Properties config = super.getConfig(); + config.setProperty(DISTRIBUTED_TRANSACTIONS, "true"); + return config; } - } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java index 1422fab..5757746 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; import static org.junit.Assert.assertEquals; @@ -44,7 +45,7 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.TXExpiryJUnitTest; +import org.apache.geode.ExpirationDetector; import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.AttributesMutator; import org.apache.geode.cache.CacheEvent; @@ -125,12 +126,14 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { protected final String D_REFERENCE = "distrReference"; private final SerializableCallable getNumberOfTXInProgress = new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); return mgr.hostedTransactionsInProgressForTest(); } }; private final SerializableCallable verifyNoTxState = new SerializableCallable() { + @Override public Object call() throws Exception { // TXManagerImpl mgr = getGemfireCache().getTxManager(); // assertIndexDetailsEquals(0, mgr.hostedTransactionsInProgressForTest()); @@ -218,6 +221,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { protected void initAccessorAndDataStore(VM accessor, VM datastore, final int redundantCopies) { accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(true/* accessor */, redundantCopies, null); return null; @@ -225,6 +229,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(false/* accessor */, redundantCopies, null); populateData(); @@ -236,6 +241,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { protected void initAccessorAndDataStore(VM accessor, VM datastore1, VM datastore2, final int redundantCopies) { datastore2.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(false/* accessor */, redundantCopies, null); return null; @@ -248,6 +254,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { private void initAccessorAndDataStoreWithInterestPolicy(VM accessor, VM datastore1, VM datastore2, final int redundantCopies) { datastore2.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(false/* accessor */, redundantCopies, InterestPolicy.ALL); return null; @@ -269,6 +276,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { this.op = op; } + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); LogWriterUtils.getLogWriter().fine("testTXPut starting tx"); @@ -563,6 +571,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.PUT)); datastore.invoke(new SerializableCallable("verify tx") { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -571,6 +580,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); TXStateProxy tx = mgr.pauseTransaction(); @@ -586,6 +596,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertFalse(mgr.isHostedTxInProgress(txId)); @@ -594,6 +605,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); if (commit) { accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { verifyAfterCommit(OP.PUT); return null; @@ -601,6 +613,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); } else { accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { verifyAfterRollback(OP.PUT); return null; @@ -619,6 +632,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.GET)); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -641,6 +655,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -659,13 +674,16 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator(); am.setCacheLoader(new CacheLoader() { + @Override public Object load(LoaderHelper helper) throws CacheLoaderException { return new Customer("sup dawg", "add"); } + @Override public void close() {} }); return null; @@ -673,6 +691,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.begin(); @@ -705,12 +724,14 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { return null; } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(CUSTOMER); CustId sup = new CustId(7); @@ -758,13 +779,16 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator(); am.setCacheLoader(new CacheLoader() { + @Override public Object load(LoaderHelper helper) throws CacheLoaderException { return new Customer("sup dawg", "addr"); } + @Override public void close() {} }); CacheTransactionManager mgr = getGemfireCache().getTxManager(); @@ -782,6 +806,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { return null; } @@ -802,6 +827,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.PUT)); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -822,6 +848,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertNotNull(mgr.getTXState()); @@ -846,6 +873,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.INVALIDATE)); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -866,6 +894,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -888,6 +917,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.DESTROY)); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -908,6 +938,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -928,6 +959,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId newCustId = new CustId(10); final Customer updateCust = new Customer("customer10", "address10"); final TXId txId = (TXId) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); mgr.begin(); @@ -963,6 +995,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getGemfireCache().getRegion(CUSTOMER); int hash1 = PartitionedRegionHelper.getHashKey((PartitionedRegion) cust, new CustId(1)); @@ -987,6 +1020,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -1026,6 +1060,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId custId = new CustId(1); final TXId txId = (TXId) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getGemfireCache().getRegion(CUSTOMER); Region ref = getGemfireCache().getRegion(D_REFERENCE); @@ -1045,6 +1080,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -1065,6 +1101,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -1103,6 +1140,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId custId2 = new CustId(2); final CustId custId20 = new CustId(20); final TXId txId = (TXId) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getGemfireCache().getRegion(CUSTOMER); Region ref = getGemfireCache().getRegion(D_REFERENCE); @@ -1121,6 +1159,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -1143,6 +1182,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -1194,6 +1234,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM datastore1 = host.getVM(1); VM datastore2 = host.getVM(3); datastore2.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(false/* accessor */, 0, null); return null; @@ -1209,6 +1250,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId custId4 = new CustId(4); final CustId custId20 = new CustId(20); final TXId txId = (TXId) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getGemfireCache().getRegion(CUSTOMER); Region ref = getGemfireCache().getRegion(D_REFERENCE); @@ -1242,6 +1284,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { // Create a second data store. datastore2.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(false/* accessor */, 1, null); return null; @@ -1256,6 +1299,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId custId4 = new CustId(4); final CustId custId20 = new CustId(20); final TXId txId = (TXId) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getGemfireCache().getRegion(CUSTOMER); Region ref = getGemfireCache().getRegion(D_REFERENCE); @@ -1270,6 +1314,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); SerializableCallable checkArtifacts = new SerializableCallable() { + @Override public Object call() throws Exception { PartitionedRegion cust = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); assertNull(cust.get(custId0)); @@ -1292,6 +1337,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId custId = new CustId(1); final Customer updatedCust = new Customer("updated", "updated"); final TXId txId = (TXId) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getGemfireCache().getRegion(CUSTOMER); Region ref = getGemfireCache().getRegion(D_REFERENCE); @@ -1311,6 +1357,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); assertTrue(mgr.isHostedTxInProgress(txId)); @@ -1331,6 +1378,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -1370,6 +1418,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM datastore2 = host.getVM(2); datastore2.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { createRegion(false, 1, null); return null; @@ -1379,6 +1428,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, 1); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -1443,6 +1493,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM accessor = getVMForTransactions(acc, datastore); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region ref = getCache().getRegion(D_REFERENCE); ref.getAttributesMutator().addCacheListener(new TestCacheListener(true)); @@ -1462,6 +1513,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); SerializableCallable addListenersToDataStore = new SerializableCallable() { + @Override public Object call() throws Exception { Region ref = getCache().getRegion(D_REFERENCE); ref.getAttributesMutator().addCacheListener(new TestCacheListener(false)); @@ -1487,6 +1539,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { if (op != OP.INVALIDATE) { // Ensure the cache writer was not fired in accessor accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(CUSTOMER); assertFalse(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired); @@ -1500,6 +1553,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { // Ensure the cache writer was fired in the primary datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(CUSTOMER); assertTrue(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired); @@ -1513,6 +1567,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -1521,6 +1576,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TestTxListener l = (TestTxListener) getGemfireCache().getTxManager().getListener(); assertTrue(l.isListenerInvoked()); @@ -1528,6 +1584,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); SerializableCallable verifyListeners = new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(CUSTOMER); Region order = getCache().getRegion(ORDER); @@ -1614,34 +1671,44 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { this.isAccessor = isAccessor; } + @Override public void afterCreate(EntryEvent event) { verifyOrigin(event); verifyPutAll(event); } + @Override public void afterUpdate(EntryEvent event) { verifyOrigin(event); verifyPutAll(event); } + @Override public void afterDestroy(EntryEvent event) { verifyOrigin(event); } + @Override public void afterInvalidate(EntryEvent event) { verifyOrigin(event); } + @Override public void afterRegionClear(RegionEvent event) {} + @Override public void afterRegionCreate(RegionEvent event) {} + @Override public void afterRegionDestroy(RegionEvent event) {} + @Override public void afterRegionInvalidate(RegionEvent event) {} + @Override public void afterRegionLive(RegionEvent event) {} + @Override public void close() {} } @@ -1653,6 +1720,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { this.isAccessor = isAccessor; } + @Override public void beforeCreate(EntryEvent event) throws CacheWriterException { getGemfireCache().getLogger() .info("SWAP:beforeCreate:" + event + " op:" + event.getOperation()); @@ -1665,6 +1733,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { wasFired = true; } + @Override public void beforeUpdate(EntryEvent event) throws CacheWriterException { getGemfireCache().getLogger() .info("SWAP:beforeCreate:" + event + " op:" + event.getOperation()); @@ -1673,19 +1742,23 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { setFired(event); } + @Override public void beforeDestroy(EntryEvent event) throws CacheWriterException { verifyOrigin(event); setFired(event); } + @Override public void beforeRegionClear(RegionEvent event) throws CacheWriterException { setFired(null); } + @Override public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException { setFired(null); } + @Override public void close() {} } @@ -1733,15 +1806,18 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { this.isAccessor = isAccessor; } + @Override public void afterCommit(TransactionEvent event) { listenerInvoked = true; verify(event); } + @Override public void afterFailedCommit(TransactionEvent event) { verify(event); } + @Override public void afterRollback(TransactionEvent event) { listenerInvoked = true; verify(event); @@ -1751,6 +1827,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { return this.listenerInvoked; } + @Override public void close() {} } @@ -1759,10 +1836,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { this.isAccessor = isAccessor; } + @Override public void beforeCommit(TransactionEvent event) { verify(event); } + @Override public void close() {} } @@ -1775,10 +1854,13 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM accessor = getVMForTransactions(acc, datastore); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { getGemfireCache().getTxManager().setWriter(new TransactionWriter() { + @Override public void close() {} + @Override public void beforeCommit(TransactionEvent event) throws TransactionWriterException { throw new TransactionWriterException("AssertionError"); } @@ -1788,6 +1870,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { getGemfireCache().getTxManager().begin(); Region r = getCache().getRegion(CUSTOMER); @@ -1823,6 +1906,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM taskVM = isAccessor ? accessor : datastore1; taskVM.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); TXManagerImpl mgr = getGemfireCache().getTxManager(); @@ -1836,6 +1920,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore2.invoke(verifyNoTxState); taskVM.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -1870,6 +1955,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { assertEquals(1, txOnDatastore1 + txOnDatastore2); taskVM.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -1955,6 +2041,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, redundancy); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Set originalSet; @@ -1997,6 +2084,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore2.invoke(verifyNoTxState); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -2062,6 +2150,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { assertEquals(1, txOnDatastore1 + txOnDatastore2); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.commit(); @@ -2079,6 +2168,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { int originalSetSize; int expectedSetSize; + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTXMgr(); custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); @@ -2195,6 +2285,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); Region rr = getGemfireCache().getRegion(D_REFERENCE); @@ -2245,6 +2336,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); Region rr = getGemfireCache().getRegion(D_REFERENCE); @@ -2295,6 +2387,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); Region rr = getGemfireCache().getRegion(D_REFERENCE); @@ -2345,6 +2438,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); SerializableCallable doIllegalIteration = new SerializableCallable() { + @Override public Object call() throws Exception { Region r = getGemfireCache().getRegion(CUSTOMER); Set keySet = r.keySet(); @@ -2446,6 +2540,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { class TXFunction implements Function { static final String id = "TXFunction"; + @Override public void execute(FunctionContext context) { Region r = null; r = getGemfireCache().getRegion(CUSTOMER); @@ -2455,18 +2550,22 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { context.getResultSender().lastResult(Boolean.TRUE); } + @Override public String getId() { return id; } + @Override public boolean hasResult() { return true; } + @Override public boolean optimizeForWrite() { return true; } + @Override public boolean isHA() { return false; } @@ -2494,6 +2593,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); SerializableCallable registerFunction = new SerializableCallable() { + @Override public Object call() throws Exception { FunctionService.registerFunction(new TXFunction()); return null; @@ -2505,6 +2605,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore2.invoke(registerFunction); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { PartitionedRegion custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); TXManagerImpl mgr = getGemfireCache().getTXMgr(); @@ -2555,6 +2656,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { assertEquals(1, txOnDatastore1 + txOnDatastore2); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTXMgr(); mgr.commit(); @@ -2563,6 +2665,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore1.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); assertEquals(expectedCustomer, custRegion.get(expectedCustId)); @@ -2571,6 +2674,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTXMgr(); mgr.begin(); @@ -2665,6 +2769,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); @@ -2694,6 +2799,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { this.isAccessor = isAccessor; } + @Override public Object call() throws Exception { AttributesFactory af = new AttributesFactory(); af.setDataPolicy(isAccessor ? DataPolicy.EMPTY : DataPolicy.REPLICATE); @@ -2717,6 +2823,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new CreateDR(true)); SerializableCallable registerFunction = new SerializableCallable() { + @Override public Object call() throws Exception { FunctionService.registerFunction(new TXFunction()); return null; @@ -2728,6 +2835,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore2.invoke(registerFunction); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); TXManagerImpl mgr = getGemfireCache().getTXMgr(); @@ -2747,6 +2855,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore1.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { final Region custRegion = getGemfireCache().getRegion(CUSTOMER); TXManagerImpl mgr = getGemfireCache().getTXMgr(); @@ -2783,6 +2892,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); SerializableCallable registerFunction = new SerializableCallable() { + @Override public Object call() throws Exception { FunctionService.registerFunction(new TXFunction()); return null; @@ -2794,6 +2904,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore2.invoke(registerFunction); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); TXManagerImpl mgr = getGemfireCache().getTXMgr(); @@ -2819,6 +2930,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { assertEquals(1, txOnDatastore1 + txOnDatastore2); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); CacheTransactionManager mgr = getGemfireCache().getTXMgr(); @@ -2830,6 +2942,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); // test onMembers SerializableCallable getMember = new SerializableCallable() { + @Override public Object call() throws Exception { return getGemfireCache().getMyId(); } @@ -2837,6 +2950,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final InternalDistributedMember ds1 = (InternalDistributedMember) datastore1.invoke(getMember); final InternalDistributedMember ds2 = (InternalDistributedMember) datastore2.invoke(getMember); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); // get owner for expectedKey @@ -2870,6 +2984,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { assertEquals(1, txOnDatastore1_1 + txOnDatastore2_1); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); CacheTransactionManager mgr = getGemfireCache().getTXMgr(); @@ -2881,6 +2996,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); // test function execution on data store final DistributedMember owner = (DistributedMember) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); return pr.getOwnerForKey(pr.getKeyInfo(expectedCustId)); @@ -2888,6 +3004,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); SerializableCallable testFnOnDs = new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTXMgr(); PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); @@ -2905,6 +3022,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }; SerializableCallable closeTx = new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getGemfireCache().getRegion(CUSTOMER); CacheTransactionManager mgr = getGemfireCache().getTXMgr(); @@ -2933,6 +3051,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { // test that function is rejected if function target is not same as txState target accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTXMgr(); PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER); @@ -3008,6 +3127,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM accessor = getVMForTransactions(acc, datastore); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { getGemfireCache().getTxManager().addListener(new TestTxListener(false)); return null; @@ -3016,6 +3136,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId expectedCustId = new CustId(6); final Customer expectedCustomer = new Customer("customer6", "address6"); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { getGemfireCache().getTxManager().addListener(new TestTxListener(true)); Region custRegion = getCache().getRegion(CUSTOMER); @@ -3030,6 +3151,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); assertNull(custRegion.get(expectedCustId)); @@ -3037,6 +3159,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Context ctx = getCache().getJNDIContext(); @@ -3052,6 +3175,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TestTxListener l = (TestTxListener) getGemfireCache().getTXMgr().getListener(); assertTrue(l.isListenerInvoked()); @@ -3073,6 +3197,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { int fireD = 0; int fireU = 0; + @Override public void beforeCreate(EntryEvent event) throws CacheWriterException { if (!event.isOriginRemote()) { throw new CacheWriterException("SUP?? This CREATE is supposed to be isOriginRemote"); @@ -3080,6 +3205,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { fireC++; } + @Override public void beforeDestroy(EntryEvent event) throws CacheWriterException { getGemfireCache().getLoggerI18n().fine("SWAP:writer:createEvent:" + event); if (!event.isOriginRemote()) { @@ -3088,6 +3214,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { fireD++; } + @Override public void beforeUpdate(EntryEvent event) throws CacheWriterException { if (!event.isOriginRemote()) { throw new CacheWriterException("SUP?? This UPDATE is supposed to be isOriginRemote"); @@ -3098,6 +3225,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region refRegion = getCache().getRegion(D_REFERENCE); refRegion.getAttributesMutator().setCacheWriter(new OriginRemoteRRWriter()); @@ -3110,6 +3238,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new DoOpsInTX(OP.PUT)); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); TXStateProxy tx = mgr.pauseTransaction(); @@ -3123,6 +3252,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new DoOpsInTX(OP.DESTROY)); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); TXStateProxy tx = mgr.pauseTransaction(); @@ -3137,6 +3267,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new DoOpsInTX(OP.PUT)); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); TXStateProxy tx = mgr.pauseTransaction(); @@ -3149,6 +3280,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region refRegion = getCache().getRegion(D_REFERENCE); OriginRemoteRRWriter w = (OriginRemoteRRWriter) refRegion.getAttributes().getCacheWriter(); @@ -3173,6 +3305,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region refRegion = getCache().getRegion(D_REFERENCE); refRegion.create("sup", "dawg"); @@ -3181,6 +3314,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getGemfireCache().getTxManager(); TXStateProxy tx = mgr.pauseTransaction(); @@ -3193,6 +3327,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region refRegion = getCache().getRegion(D_REFERENCE); assertEquals("dawg", refRegion.get("sup")); @@ -3209,6 +3344,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore, 0); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(CUSTOMER); TXManagerImpl mgr = getCache().getTxManager(); @@ -3219,11 +3355,13 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); final InternalDistributedMember member = (InternalDistributedMember) accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { return getCache().getMyId(); } }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { TXManagerImpl mgr = getCache().getTxManager(); assertEquals(1, mgr.hostedTransactionsInProgressForTest()); @@ -3257,6 +3395,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore1, datastore2, 0); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Map custMap = new HashMap(); for (int i = 0; i < 10; i++) { @@ -3447,6 +3586,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStore(accessor, datastore, 0); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(D_REFERENCE); cust.put("meow", "this is a meow, deal with it"); @@ -3458,6 +3598,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(D_REFERENCE); cust.getAttributesMutator().addCacheListener(new OneUpdateCacheListener()); @@ -3468,6 +3609,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { CacheTransactionManager mgr = getGemfireCache().getTxManager(); mgr.begin(); @@ -3480,6 +3622,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(D_REFERENCE); OneUpdateCacheListener rat = @@ -3492,6 +3635,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region cust = getCache().getRegion(D_REFERENCE); OneDestroyAndThenOneCreateCacheWriter wri = @@ -3584,6 +3728,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { protected Integer startServer(VM vm) { return (Integer) vm.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); CacheServer s = getCache().addCacheServer(); @@ -3597,6 +3742,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { protected void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri) { vm.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { ClientCacheFactory ccf = new ClientCacheFactory(); ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port); @@ -3626,17 +3772,20 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { boolean invoked = false; + @Override public void onError(CqEvent aCqEvent) { // TODO Auto-generated method stub } + @Override public void onEvent(CqEvent aCqEvent) { // TODO Auto-generated method stub invoked = true; } + @Override public void close() { // TODO Auto-generated method stub @@ -3765,6 +3914,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { createClientRegion(client, port, false, true); accessor.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -3781,6 +3931,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); client.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -3788,10 +3939,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0]; WaitCriterion waitForListenerInvocation = new WaitCriterion() { + @Override public boolean done() { return cl.invoked; } + @Override public String description() { return "listener was never invoked"; } @@ -3811,6 +3964,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { VM client = host.getVM(1); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { AttributesFactory af = new AttributesFactory(); af.setScope(Scope.DISTRIBUTED_ACK); @@ -3825,6 +3979,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final int port = startServer(datastore); client.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { ClientCacheFactory ccf = new ClientCacheFactory(); ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port); @@ -3844,6 +3999,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region ref = getCache().getRegion(D_REFERENCE); Region empty = getCache().getRegion(EMPTY_REGION); @@ -3861,14 +4017,17 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); client.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region empty = getCache().getRegion(EMPTY_REGION); final ClientListener l = (ClientListener) empty.getAttributes().getCacheListeners()[0]; WaitCriterion wc = new WaitCriterion() { + @Override public boolean done() { return l.invoked; } + @Override public String description() { return "listener invoked:" + l.invoked; } @@ -3891,6 +4050,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { createClientRegion(client, port, false, true); datastore.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -3907,6 +4067,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); client.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); Region orderRegion = getCache().getRegion(ORDER); @@ -3914,10 +4075,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0]; WaitCriterion waitForListenerInvocation = new WaitCriterion() { + @Override public boolean done() { return cl.invoked; } + @Override public String description() { return "listener was never invoked"; } @@ -3938,6 +4101,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { initAccessorAndDataStoreWithInterestPolicy(accessor, datastore1, datastore2, 1); SerializableCallable registerListener = new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); custRegion.getAttributesMutator().addCacheListener(new ListenerInvocationCounter()); @@ -3948,6 +4112,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { datastore2.invoke(registerListener); datastore1.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); getCache().getCacheTransactionManager().begin(); @@ -3960,6 +4125,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { }); SerializableCallable getListenerCount = new SerializableCallable() { + @Override public Object call() throws Exception { Region custRegion = getCache().getRegion(CUSTOMER); ListenerInvocationCounter l = @@ -3994,6 +4160,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { final CustId custId = new CustId(19); datastore1.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region refRegion = getCache().getRegion(D_REFERENCE); assertNull(refRegion.get(custId)); @@ -4003,6 +4170,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore2.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { Region refRegion = getCache().getRegion(D_REFERENCE); assertNull(refRegion.get(custId)); @@ -4011,6 +4179,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { } }); datastore1.invoke(new SerializableCallable() { + @Override public Object call() throws Exception { try { getCache().getCacheTransactionManager().commit(); @@ -4082,6 +4251,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { assertEquals(Status.STATUS_ACTIVE, tx.getStatus()); final CountDownLatch latch = new CountDownLatch(1); Thread t = new Thread(new Runnable() { + @Override public void run() { Context ctx = getCache().getJNDIContext(); try { @@ -4263,7 +4433,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { r.put("nonTXkey", "nonTXvalue"); getCache().getCacheTransactionManager().begin(); r.put("key", "newvalue"); - TXExpiryJUnitTest.waitForEntryExpiration(lr, "key"); + waitForEntryExpiration(lr, "key"); } finally { ExpiryTask.permitExpiration(); } @@ -4496,4 +4666,18 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase { vm1.invoke(verifyAssert); } + + private void waitForEntryExpiration(LocalRegion lr, String key) { + try { + ExpirationDetector detector; + do { + detector = new ExpirationDetector(lr.getEntryExpiryTask(key)); + ExpiryTask.expiryTaskListener = detector; + ExpiryTask.permitExpiration(); + detector.awaitExecuted(3000, MILLISECONDS); + } while (!detector.hasExpired() && detector.wasRescheduled()); + } finally { + ExpiryTask.expiryTaskListener = null; + } + } } -- To stop receiving notification emails like this one, please contact klund@apache.org.