Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6A535105F7 for ; Wed, 25 Jun 2014 15:06:12 +0000 (UTC) Received: (qmail 95816 invoked by uid 500); 25 Jun 2014 15:06:12 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 95793 invoked by uid 500); 25 Jun 2014 15:06:12 -0000 Mailing-List: contact commits-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list commits@brooklyn.incubator.apache.org Received: (qmail 95783 invoked by uid 99); 25 Jun 2014 15:06:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jun 2014 15:06:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 25 Jun 2014 15:06:09 +0000 Received: (qmail 93436 invoked by uid 99); 25 Jun 2014 15:05:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jun 2014 15:05:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5AC9C93A94A; Wed, 25 Jun 2014 15:05:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aledsage@apache.org To: commits@brooklyn.incubator.apache.org Date: Wed, 25 Jun 2014 15:05:49 -0000 Message-Id: In-Reply-To: <502c86d92dcf400f9118024e9b730867@git.apache.org> References: <502c86d92dcf400f9118024e9b730867@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/6] git commit: Convert DynamicGroupTest from groovy to java X-Virus-Checked: Checked by ClamAV on apache.org Convert DynamicGroupTest from groovy to java Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/c0fe0eb3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/c0fe0eb3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/c0fe0eb3 Branch: refs/heads/master Commit: c0fe0eb32b6583637802df645d6d9fad7f07e614 Parents: c2faca4 Author: Aled Sage Authored: Tue Jun 17 22:43:36 2014 +0100 Committer: Aled Sage Committed: Wed Jun 18 10:25:25 2014 +0100 ---------------------------------------------------------------------- .../entity/basic/DynamicGroupTest.groovy | 379 ----------------- .../brooklyn/entity/basic/DynamicGroupTest.java | 410 +++++++++++++++++++ 2 files changed, 410 insertions(+), 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c0fe0eb3/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.groovy ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.groovy b/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.groovy deleted file mode 100644 index 19c0c0c..0000000 --- a/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.groovy +++ /dev/null @@ -1,379 +0,0 @@ -package brooklyn.entity.basic - -import static brooklyn.test.TestUtils.* -import static org.testng.Assert.* - -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger - -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.testng.annotations.AfterMethod -import org.testng.annotations.BeforeMethod -import org.testng.annotations.Test - -import brooklyn.entity.Entity -import brooklyn.entity.proxying.EntitySpec -import brooklyn.event.SensorEvent -import brooklyn.event.SensorEventListener -import brooklyn.event.basic.BasicAttributeSensor -import brooklyn.test.TestUtils -import brooklyn.test.entity.TestApplication -import brooklyn.test.entity.TestEntity -import brooklyn.util.time.Duration - -import com.google.common.base.Predicate -import com.google.common.base.Predicates -import com.google.common.collect.ImmutableSet -import com.google.common.collect.Sets - -public class DynamicGroupTest { - - private static final Logger LOG = LoggerFactory.getLogger(DynamicGroupTest.class); - - private static final int TIMEOUT_MS = 50*1000; - private static final int VERY_SHORT_WAIT_MS = 100; - - private TestApplication app - private DynamicGroup group - private TestEntity e1 - private TestEntity e2 - - @BeforeMethod(alwaysRun=true) - public void setUp() { - app = ApplicationBuilder.newManagedApp(TestApplication.class); - group = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)); - e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - e2 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (app != null) Entities.destroyAll(app.getManagementContext()); - } - - @Test - public void testGroupWithNoFilterReturnsNoMembers() { - assertTrue(group.getMembers().isEmpty()) - } - - @Test - public void testGroupWithNonMatchingFilterReturnsNoMembers() { - group.setEntityFilter( { false } ) - assertTrue(group.getMembers().isEmpty()) - } - - @Test - public void testGroupWithMatchingFilterReturnsOnlyMatchingMembers() { - group.setEntityFilter( { it.getId().equals(e1.getId()) } ) - assertEquals(group.getMembers(), [e1]) - } - - @Test - public void testCanUsePredicateAsFilter() { - Predicate predicate = Predicates.equalTo(e1) - group.setEntityFilter(predicate) - assertEquals(group.getMembers(), [e1]) - } - - @Test - public void testGroupWithMatchingFilterReturnsEverythingThatMatches() { - group.setEntityFilter( { true } ) - assertEquals(ImmutableSet.copyOf(group.getMembers()), [e1, e2, app, group] as Set) - assertEquals(group.getMembers().size(), 4) - } - - @Test - public void testGroupDetectsNewlyManagedMatchingMember() { - Entity e3 = new AbstractEntity() {} - group.setEntityFilter( { it.getId().equals(e3.getId()) } ) - e3.setParent(app); - - assertEquals(group.getMembers(), []) - - Entities.manage(e3); - - executeUntilSucceeds(timeout:TIMEOUT_MS) { - assertEquals(group.getMembers(), [e3]) - } - } - - @Test - public void testGroupUsesNewFilter() { - Entity e3 = new AbstractEntity(app) {} - Entities.manage(e3); - group.setEntityFilter( { it.getId().equals(e3.getId()) } ) - - assertEquals(group.getMembers(), [e3]) - } - - @Test - public void testGroupDetectsChangedEntities() { - final BasicAttributeSensor MY_ATTRIBUTE = [ String, "test.myAttribute", "My test attribute" ] - - group.setEntityFilter( { it.getAttribute(MY_ATTRIBUTE) == "yes" } ) - group.addSubscription(null, MY_ATTRIBUTE) - - assertEquals(group.getMembers(), []) - - // When changed (such that subscription spots it), then entity added - e1.setAttribute(MY_ATTRIBUTE, "yes") - - executeUntilSucceeds(timeout:TIMEOUT_MS) { - assertEquals(group.getMembers(), [e1]) - } - - // When it stops matching, entity is removed - e1.setAttribute(MY_ATTRIBUTE, "no") - - executeUntilSucceeds(timeout:TIMEOUT_MS) { - assertEquals(group.getMembers(), []) - } - } - - @Test - public void testGroupDetectsChangedEntitiesMatchingFilter() { - final BasicAttributeSensor MY_ATTRIBUTE = [ String, "test.myAttribute", "My test attribute" ] - group.setEntityFilter( { - if (!(it.getAttribute(MY_ATTRIBUTE) == "yes")) - return false - if (it == e1) { - LOG.info("testGroupDetectsChangedEntitiesMatchingFilter scanned e1 when MY_ATTRIBUTE is yes; not a bug, but indicates things may be running slowly") - return false - } - return true - } ) - group.addSubscription(null, MY_ATTRIBUTE, { SensorEvent event -> e1 != event.source } as Predicate) - - assertEquals(group.getMembers(), []) - - // Does not subscribe to things which do not match predicate filter, - // so event from e1 should normally be ignored - // but pending rescans may cause it to pick up e1, so we ignore e1 in the entity filter also - e1.setAttribute(MY_ATTRIBUTE, "yes") - e2.setAttribute(MY_ATTRIBUTE, "yes") - - executeUntilSucceeds(timeout:TIMEOUT_MS) { - assertEquals(group.getMembers(), [e2]) - } - } - - @Test - public void testGroupRemovesUnmanagedEntity() { - group.setEntityFilter( { it.getId().equals(e1.getId()) } ) - assertEquals(group.getMembers(), [e1]) - - Entities.unmanage(e1) - - executeUntilSucceeds(timeout:TIMEOUT_MS) { - assertEquals(group.getMembers(), []) - } - } - - @Test - public void testStoppedGroupIgnoresComingAndGoingsOfEntities() { - Entity e3 = new AbstractEntity() {} - group.setEntityFilter( { it instanceof TestEntity } ) - assertEquals(ImmutableSet.copyOf(group.getMembers()), [e1, e2] as Set) - group.stop() - - e3.setParent(app) - Entities.manage(e3) - assertSucceedsContinually(timeout:VERY_SHORT_WAIT_MS) { - assertEquals(ImmutableSet.copyOf(group.getMembers()), [e1, e2] as Set) - } - - Entities.unmanage(e1) - assertSucceedsContinually(timeout:VERY_SHORT_WAIT_MS) { - assertEquals(ImmutableSet.copyOf(group.getMembers()), [e1, e2] as Set) - } - } - - - // Motivated by strange behavior observed testing load-balancing policy, but this passed... - // - // Note that addMember/removeMember is now async for when member-entity is managed/unmanaged, - // so to avoid race where entity is already unmanaged by the time addMember does its stuff, - // we wait for it to really be added. - @Test - public void testGroupAddsAndRemovesManagedAndUnmanagedEntitiesExactlyOnce() { - int NUM_CYCLES = 100 - group.setEntityFilter( { it instanceof TestEntity } ) - Set entitiesNotified = [] as Set - AtomicInteger notificationCount = new AtomicInteger(0); - List exceptions = new CopyOnWriteArrayList() - - app.subscribe(group, DynamicGroup.MEMBER_ADDED, { SensorEvent event -> - try { - LOG.debug("Notified of member added: member={}, thread={}", event.getValue(), Thread.currentThread().getName()); - Entity source = event.getSource() - Object val = event.getValue() - assertEquals(group, event.getSource()) - assertTrue(entitiesNotified.add(val)) - notificationCount.incrementAndGet() - } catch (Throwable t) { - LOG.error("Error on event $event", t); - exceptions.add(new Exception("Error on event $event", t)) - } - } as SensorEventListener); - - app.subscribe(group, DynamicGroup.MEMBER_REMOVED, { SensorEvent event -> - try { - LOG.debug("Notified of member removed: member={}, thread={}", event.getValue(), Thread.currentThread().getName()); - Entity source = event.getSource() - Object val = event.getValue() - assertEquals(group, event.getSource()) - assertTrue(entitiesNotified.remove(val)) - notificationCount.incrementAndGet() - } catch (Throwable t) { - LOG.error("Error on event $event", t); - exceptions.add(new Exception("Error on event $event", t)) - } - } as SensorEventListener); - - for (i in 1..NUM_CYCLES) { - TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - executeUntilSucceeds { - entitiesNotified.contains(entity); - } - Entities.unmanage(entity); - } - - TestUtils.executeUntilSucceeds(timeout:Duration.TEN_SECONDS) { - return notificationCount.get() == (NUM_CYCLES*2) || exceptions.size() > 0 - } - - if (exceptions.size() > 0) { - throw exceptions.get(0) - } - - assertEquals(notificationCount.get(), NUM_CYCLES*2) - } - - // The entityAdded/entityRemoved is now async for when member-entity is managed/unmanaged, - // but it should always be called sequentially (i.e. semantics of a single-threaded executor). - // Test is deliberately slow in processing entityAdded/removed calls, to try to cause - // concurrent calls if they are going to happen at all. - @Test(groups="Integration") - public void testEntityAddedAndRemovedCalledSequentially() { - int NUM_CYCLES = 10; - final Set knownMembers = Sets.newLinkedHashSet(); - final AtomicInteger notificationCount = new AtomicInteger(0); - final AtomicInteger concurrentCallsCount = new AtomicInteger(0); - final List exceptions = new CopyOnWriteArrayList(); - - DynamicGroup group2 = new DynamicGroupImpl() { - @Override protected void onEntityAdded(Entity item) { - try { - onCall("Member added: member="+item); - assertTrue(knownMembers.add(item)); - } catch (Throwable t) { - exceptions.add(new Exception("Error detected adding "+item, t)); - throw t; - } - } - @Override protected void onEntityRemoved(Entity item) { - try { - onCall("Member removed: member="+item); - assertTrue(knownMembers.remove(item)); - } catch (Throwable t) { - exceptions.add(new Exception("Error detected adding "+item, t)); - throw t; - } - } - private void onCall(String msg) { - LOG.debug(msg+", thread="+Thread.currentThread().getName()); - try { - assertEquals(concurrentCallsCount.incrementAndGet(), 1); - Thread.sleep(100); - } finally { - concurrentCallsCount.decrementAndGet(); - } - notificationCount.incrementAndGet(); - } - }; - ((EntityLocal)group2).setConfig(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(TestEntity.class)); - app.addChild(group2); - group2.init(); - Entities.manage(group2); - - for (int i = 0; i < NUM_CYCLES; i++) { - TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - Entities.unmanage(entity); - } - - TestUtils.executeUntilSucceeds(timeout:new groovy.time.TimeDuration(0, 0, 10, 0)) { - return notificationCount.get() == (NUM_CYCLES*2) || exceptions.size() > 0; - } - - if (exceptions.size() > 0) { - throw exceptions.get(0); - } - - assertEquals(notificationCount.get(), NUM_CYCLES*2); - } - - // See Deadlock in https://github.com/brooklyncentral/brooklyn/issues/378 - @Test - public void testDoesNotDeadlockOnManagedAndMemberAddedConcurrently() throws Exception { - final CountDownLatch rescanReachedLatch = new CountDownLatch(1); - final CountDownLatch entityAddedReachedLatch = new CountDownLatch(1); - final CountDownLatch rescanLatch = new CountDownLatch(1); - final CountDownLatch entityAddedLatch = new CountDownLatch(1); - - final TestEntity e3 = app.addChild(EntitySpec.create(TestEntity.class)); - Predicate filter = Predicates.equalTo(e3); - - DynamicGroup group2 = new DynamicGroupImpl() { - @Override public void rescanEntities() { - rescanReachedLatch.countDown(); - rescanLatch.await(); - super.rescanEntities(); - } - @Override protected void onEntityAdded(Entity item) { - entityAddedReachedLatch.countDown(); - entityAddedLatch.await(); - super.onEntityAdded(item); - } - }; - ((EntityLocal)group2).setConfig(DynamicGroup.ENTITY_FILTER, filter); - app.addChild(group2); - group2.init(); - - Thread t1 = new Thread(new Runnable() { - @Override public void run() { - Entities.manage(group2); - }}); - - Thread t2 = new Thread(new Runnable() { - @Override public void run() { - Entities.manage(e3); - }}); - - t1.start(); - try { - assertTrue(rescanReachedLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - - t2.start(); - assertTrue(entityAddedReachedLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - - entityAddedLatch.countDown(); - rescanLatch.countDown(); - - t2.join(TIMEOUT_MS); - t1.join(TIMEOUT_MS); - assertFalse(t1.isAlive()); - assertFalse(t2.isAlive()); - - } finally { - t1.interrupt(); - t2.interrupt(); - } - - executeUntilSucceeds(timeout:TIMEOUT_MS) { - assertEquals(group2.getMembers(), [e3]); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c0fe0eb3/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.java b/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.java new file mode 100644 index 0000000..cd3ebdd --- /dev/null +++ b/core/src/test/java/brooklyn/entity/basic/DynamicGroupTest.java @@ -0,0 +1,410 @@ +package brooklyn.entity.basic; + +import static brooklyn.test.Asserts.assertEqualsIgnoringOrder; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.Entity; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.event.basic.Sensors; +import brooklyn.test.Asserts; +import brooklyn.test.entity.TestApplication; +import brooklyn.test.entity.TestEntity; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.Time; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class DynamicGroupTest { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicGroupTest.class); + + private static final int TIMEOUT_MS = 50*1000; + private static final int VERY_SHORT_WAIT_MS = 100; + + private TestApplication app; + private DynamicGroup group; + private TestEntity e1; + private TestEntity e2; + + @BeforeMethod(alwaysRun=true) + public void setUp() { + app = ApplicationBuilder.newManagedApp(TestApplication.class); + group = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)); + e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + e2 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + @Test + public void testGroupWithNoFilterReturnsNoMembers() throws Exception { + assertTrue(group.getMembers().isEmpty()); + } + + @Test + public void testGroupWithNonMatchingFilterReturnsNoMembers() throws Exception { + group.setEntityFilter(Predicates.alwaysFalse()); + assertTrue(group.getMembers().isEmpty()); + } + + @Test + public void testGroupWithMatchingFilterReturnsOnlyMatchingMembers() throws Exception { + group.setEntityFilter(EntityPredicates.idEqualTo(e1.getId())); + assertEqualsIgnoringOrder(group.getMembers(), ImmutableList.of(e1)); + } + + @Test + public void testCanUsePredicateAsFilter() throws Exception { + Predicate predicate = Predicates.equalTo(e1); + group.setEntityFilter(predicate); + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e1)); + } + + @Test + public void testGroupWithMatchingFilterReturnsEverythingThatMatches() throws Exception { + group.setEntityFilter(Predicates.alwaysTrue()); + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e1, e2, app, group)); + } + + @Test + public void testGroupDetectsNewlyManagedMatchingMember() throws Exception { + final Entity e3 = new AbstractEntity() {}; + group.setEntityFilter(EntityPredicates.idEqualTo(e3.getId())); + e3.setParent(app); + + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of()); + + Entities.manage(e3); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e3)); + }}); + } + + @Test + public void testGroupUsesNewFilter() throws Exception { + Entity e3 = new AbstractEntity(app) {}; + Entities.manage(e3); + group.setEntityFilter(EntityPredicates.idEqualTo(e3.getId())); + + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e3)); + } + + @Test + public void testGroupDetectsChangedEntities() throws Exception { + final AttributeSensor MY_ATTRIBUTE = Sensors.newStringSensor("test.myAttribute", "My test attribute"); + + group.setEntityFilter(EntityPredicates.attributeEqualTo(MY_ATTRIBUTE, "yes")); + group.addSubscription(null, MY_ATTRIBUTE); + + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of()); + + // When changed (such that subscription spots it), then entity added + e1.setAttribute(MY_ATTRIBUTE, "yes"); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e1)); + }}); + + // When it stops matching, entity is removed + e1.setAttribute(MY_ATTRIBUTE, "no"); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of()); + }}); + } + + @Test + public void testGroupDetectsChangedEntitiesMatchingFilter() throws Exception { + final AttributeSensor MY_ATTRIBUTE = Sensors.newStringSensor("test.myAttribute", "My test attribute"); + group.setEntityFilter(new Predicate() { + @Override public boolean apply(Entity input) { + if (!(input.getAttribute(MY_ATTRIBUTE) == "yes")) + return false; + if (input.equals(e1)) { + LOG.info("testGroupDetectsChangedEntitiesMatchingFilter scanned e1 when MY_ATTRIBUTE is yes; not a bug, but indicates things may be running slowly"); + return false; + } + return true; + }}); + group.addSubscription(null, MY_ATTRIBUTE, new Predicate>() { + @Override public boolean apply(SensorEvent input) { + return !e1.equals(input.getSource()); + }}); + + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of()); + + // Does not subscribe to things which do not match predicate filter, + // so event from e1 should normally be ignored + // but pending rescans may cause it to pick up e1, so we ignore e1 in the entity filter also + e1.setAttribute(MY_ATTRIBUTE, "yes"); + e2.setAttribute(MY_ATTRIBUTE, "yes"); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e2)); + }}); + } + + @Test + public void testGroupRemovesUnmanagedEntity() throws Exception { + group.setEntityFilter(EntityPredicates.idEqualTo(e1.getId())); + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e1)); + + Entities.unmanage(e1); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of()); + }}); + } + + @Test + public void testStoppedGroupIgnoresComingAndGoingsOfEntities() throws Exception { + Entity e3 = new AbstractEntity() {}; + group.setEntityFilter(Predicates.instanceOf(TestEntity.class)); + assertEqualsIgnoringOrder(group.getMembers(), ImmutableSet.of(e1, e2)); + group.stop(); + + e3.setParent(app); + Entities.manage(e3); + Asserts.succeedsContinually(MutableMap.of("timeout", VERY_SHORT_WAIT_MS), new Runnable() { + public void run() { + assertEquals(ImmutableSet.copyOf(group.getMembers()), ImmutableSet.of(e1, e2)); + }}); + + Entities.unmanage(e1); + Asserts.succeedsContinually(MutableMap.of("timeout", VERY_SHORT_WAIT_MS), new Runnable() { + public void run() { + assertEqualsIgnoringOrder(ImmutableSet.copyOf(group.getMembers()), ImmutableSet.of(e1, e2)); + }}); + } + + + // Motivated by strange behavior observed testing load-balancing policy, but this passed... + // + // Note that addMember/removeMember is now async for when member-entity is managed/unmanaged, + // so to avoid race where entity is already unmanaged by the time addMember does its stuff, + // we wait for it to really be added. + @Test + public void testGroupAddsAndRemovesManagedAndUnmanagedEntitiesExactlyOnce() throws Exception { + final int NUM_CYCLES = 100; + group.setEntityFilter(Predicates.instanceOf(TestEntity.class)); + final Set entitiesNotified = Sets.newLinkedHashSet(); + final AtomicInteger notificationCount = new AtomicInteger(0); + final List exceptions = new CopyOnWriteArrayList(); + + app.subscribe(group, DynamicGroup.MEMBER_ADDED, new SensorEventListener() { + public void onEvent(SensorEvent event) { + try { + LOG.debug("Notified of member added: member={}, thread={}", event.getValue(), Thread.currentThread().getName()); + Entity source = event.getSource(); + Object val = event.getValue(); + assertEquals(group, event.getSource()); + assertTrue(entitiesNotified.add((TestEntity)val)); + notificationCount.incrementAndGet(); + } catch (Throwable t) { + LOG.error("Error on event $event", t); + exceptions.add(new Exception("Error on event $event", t)); + } + }}); + + app.subscribe(group, DynamicGroup.MEMBER_REMOVED, new SensorEventListener() { + public void onEvent(SensorEvent event) { + try { + LOG.debug("Notified of member removed: member={}, thread={}", event.getValue(), Thread.currentThread().getName()); + Entity source = event.getSource(); + Object val = event.getValue(); + assertEquals(group, event.getSource()); + assertTrue(entitiesNotified.remove(val)); + notificationCount.incrementAndGet(); + } catch (Throwable t) { + LOG.error("Error on event $event", t); + exceptions.add(new Exception("Error on event $event", t)); + } + }}); + + for (int i = 0; i < NUM_CYCLES; i++) { + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + Asserts.succeedsEventually(new Runnable() { + public void run() { + entitiesNotified.contains(entity); + }}); + Entities.unmanage(entity); + } + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertTrue(notificationCount.get() == (NUM_CYCLES*2) || exceptions.size() > 0); + }}); + + if (exceptions.size() > 0) { + throw exceptions.get(0); + } + + assertEquals(notificationCount.get(), NUM_CYCLES*2); + } + + // The entityAdded/entityRemoved is now async for when member-entity is managed/unmanaged, + // but it should always be called sequentially (i.e. semantics of a single-threaded executor). + // Test is deliberately slow in processing entityAdded/removed calls, to try to cause + // concurrent calls if they are going to happen at all. + @Test(groups="Integration") + public void testEntityAddedAndRemovedCalledSequentially() throws Exception { + final int NUM_CYCLES = 10; + final Set knownMembers = Sets.newLinkedHashSet(); + final AtomicInteger notificationCount = new AtomicInteger(0); + final AtomicInteger concurrentCallsCount = new AtomicInteger(0); + final List exceptions = new CopyOnWriteArrayList(); + + DynamicGroupImpl group2 = new DynamicGroupImpl() { + @Override protected void onEntityAdded(Entity item) { + try { + onCall("Member added: member="+item); + assertTrue(knownMembers.add(item)); + } catch (Throwable t) { + exceptions.add(new Exception("Error detected adding "+item, t)); + throw Exceptions.propagate(t); + } + } + @Override protected void onEntityRemoved(Entity item) { + try { + onCall("Member removed: member="+item); + assertTrue(knownMembers.remove(item)); + } catch (Throwable t) { + exceptions.add(new Exception("Error detected adding "+item, t)); + throw Exceptions.propagate(t); + } + } + private void onCall(String msg) { + LOG.debug(msg+", thread="+Thread.currentThread().getName()); + try { + assertEquals(concurrentCallsCount.incrementAndGet(), 1); + Time.sleep(100); + } finally { + concurrentCallsCount.decrementAndGet(); + } + notificationCount.incrementAndGet(); + } + }; + ((EntityLocal)group2).setConfig(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(TestEntity.class)); + app.addChild(group2); + group2.init(); + Entities.manage(group2); + + for (int i = 0; i < NUM_CYCLES; i++) { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + Entities.unmanage(entity); + } + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertTrue(notificationCount.get() == (NUM_CYCLES*2) || exceptions.size() > 0); + }}); + + if (exceptions.size() > 0) { + throw exceptions.get(0); + } + + assertEquals(notificationCount.get(), NUM_CYCLES*2); + } + + // See Deadlock in https://github.com/brooklyncentral/brooklyn/issues/378 + @Test + public void testDoesNotDeadlockOnManagedAndMemberAddedConcurrently() throws Exception { + final CountDownLatch rescanReachedLatch = new CountDownLatch(1); + final CountDownLatch entityAddedReachedLatch = new CountDownLatch(1); + final CountDownLatch rescanLatch = new CountDownLatch(1); + final CountDownLatch entityAddedLatch = new CountDownLatch(1); + + final TestEntity e3 = app.addChild(EntitySpec.create(TestEntity.class)); + + final DynamicGroupImpl group2 = new DynamicGroupImpl() { + @Override public void rescanEntities() { + rescanReachedLatch.countDown(); + try { + rescanLatch.await(); + } catch (InterruptedException e) { + Exceptions.propagate(e); + } + super.rescanEntities(); + } + @Override protected void onEntityAdded(Entity item) { + entityAddedReachedLatch.countDown(); + try { + entityAddedLatch.await(); + } catch (InterruptedException e) { + Exceptions.propagate(e); + } + super.onEntityAdded(item); + } + }; + ((EntityLocal)group2).setConfig(DynamicGroup.ENTITY_FILTER, Predicates.equalTo(e3)); + app.addChild(group2); + group2.init(); + + Thread t1 = new Thread(new Runnable() { + @Override public void run() { + Entities.manage(group2); + }}); + + Thread t2 = new Thread(new Runnable() { + @Override public void run() { + Entities.manage(e3); + }}); + + t1.start(); + try { + assertTrue(rescanReachedLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + + t2.start(); + assertTrue(entityAddedReachedLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + + entityAddedLatch.countDown(); + rescanLatch.countDown(); + + t2.join(TIMEOUT_MS); + t1.join(TIMEOUT_MS); + assertFalse(t1.isAlive()); + assertFalse(t2.isAlive()); + + } finally { + t1.interrupt(); + t2.interrupt(); + } + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEqualsIgnoringOrder(group2.getMembers(), ImmutableSet.of(e3)); + }}); + } +}