Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 55D8518E39 for ; Thu, 25 Feb 2016 00:07:56 +0000 (UTC) Received: (qmail 44627 invoked by uid 500); 25 Feb 2016 00:07:51 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 44594 invoked by uid 500); 25 Feb 2016 00:07:51 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 44585 invoked by uid 99); 25 Feb 2016 00:07:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 00:07:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AD1B9C04EC for ; Thu, 25 Feb 2016 00:07:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Kgqiv2z090IH for ; Thu, 25 Feb 2016 00:07:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 13C295FAC5 for ; Thu, 25 Feb 2016 00:07:37 +0000 (UTC) Received: (qmail 69536 invoked by uid 99); 24 Feb 2016 23:40:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 23:40:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7323BE8ED7; Wed, 24 Feb 2016 23:40:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: udo@apache.org To: commits@geode.incubator.apache.org Date: Wed, 24 Feb 2016 23:40:59 -0000 Message-Id: In-Reply-To: <2311b0d530c54a04b6d2b4c076d78306@git.apache.org> References: <2311b0d530c54a04b6d2b4c076d78306@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/7] incubator-geode git commit: GEODE-870: Rejecting of old view GEODE-870: Rejecting of old view Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/036f2205 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/036f2205 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/036f2205 Branch: refs/heads/feature/GEODE-870 Commit: 036f22059aac18eeb3cb84c016d671d6bb8b7343 Parents: d50623b Author: Udo Kohlmeyer Authored: Mon Feb 1 14:08:32 2016 +1100 Committer: Udo Kohlmeyer Committed: Thu Feb 25 10:39:49 2016 +1100 ---------------------------------------------------------------------- .../gms/messages/ViewRejectMessage.java | 96 +++ .../membership/gms/membership/GMSJoinLeave.java | 92 +-- .../internal/DataSerializableFixedID.java | 3 +- .../gemfire/distributed/LocatorDUnitTest.java | 816 +++++++++++-------- .../test/dunit/SerializableRunnable.java | 12 +- 5 files changed, 608 insertions(+), 411 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/036f2205/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java new file mode 100755 index 0000000..e5bf9e2 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.distributed.internal.membership.gms.messages; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.NetView; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class ViewRejectMessage extends HighPriorityDistributionMessage { + + private int viewId; + private NetView rejectedView; + private String reason; + + public ViewRejectMessage(InternalDistributedMember recipient, int viewId, NetView rejectedView, String reason) { + super(); + setRecipient(recipient); + this.viewId = viewId; + this.rejectedView = rejectedView; + this.reason = reason; + } + + public ViewRejectMessage() { + // no-arg constructor for serialization + } + + public int getViewId() { + return viewId; + } + + public NetView getRejectedView() { + return this.rejectedView; + } + + + @Override + public int getDSFID() { + // TODO Auto-generated method stub + return VIEW_REJECT_MESSAGE; + } + + public String getReason() { + return reason; + } + + @Override + public int getProcessorType() { + return 0; + } + + @Override + public void process(DistributionManager dm) { + throw new IllegalStateException("this message is not intended to execute in a thread pool"); + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + out.writeInt(this.viewId); + DataSerializer.writeObject(this.rejectedView, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.viewId = in.readInt(); + this.rejectedView = DataSerializer.readObject(in); + } + + @Override + public String toString() { + String s = getSender() == null? getRecipientsDescription() : ""+getSender(); + return "ViewRejectMessage("+s+"; "+this.viewId+"; rejectedView="+this.rejectedView +")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/036f2205/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 5d34041..c7eacfa 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -16,41 +16,6 @@ */ package com.gemstone.gemfire.distributed.internal.membership.gms.membership; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.NETWORK_PARTITION_MESSAGE; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_REQUEST; -import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE; -import static com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig.MEMBER_REQUEST_COLLECTION_INTERVAL; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimerTask; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.logging.log4j.Logger; - import com.gemstone.gemfire.GemFireConfigException; import com.gemstone.gemfire.SystemConnectException; import com.gemstone.gemfire.distributed.DistributedMember; @@ -68,24 +33,26 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinL import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler; import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest; import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HasMemberID; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.NetworkPartitionMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.*; import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.security.AuthenticationFailedException; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig.MEMBER_REQUEST_COLLECTION_INTERVAL; +import static com.gemstone.gemfire.internal.DataSerializableFixedID.*; /** * GMSJoinLeave handles membership communication with other processes in the * distributed system. It replaces the JGroups channel membership services * that Geode formerly used for this purpose. - * */ public class GMSJoinLeave implements JoinLeave, MessageHandler { @@ -221,7 +188,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { try { if (Boolean.getBoolean(BYPASS_DISCOVERY_PROPERTY)) { - synchronized(viewInstallationLock) { + synchronized (viewInstallationLock) { becomeCoordinator(); } return true; @@ -391,7 +358,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } return response; } - + @Override public boolean isMemberLeaving(DistributedMember mbr) { if (getPendingRequestIDs(LEAVE_REQUEST_MESSAGE).contains(mbr) @@ -471,8 +438,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { recordViewRequest(incomingRequest); return; } - - + InternalDistributedMember mbr = incomingRequest.getMemberID(); if (logger.isDebugEnabled()) { @@ -543,7 +509,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (!fromMe) { logger.info("Membership received a request to remove " + mbr - + " from " + incomingRequest.getSender() + + " from " + incomingRequest.getSender() + " reason="+incomingRequest.getReason()); } @@ -630,7 +596,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { public void delayViewCreationForTest(int millis) { requestCollectionInterval = millis; } - /** * Transitions this member into the coordinator role. This must @@ -872,6 +837,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { void setTcpClientWrapper(TcpClientWrapper tcpClientWrapper) { this.tcpClientWrapper = tcpClientWrapper; } + /** * This contacts the locators to find out who the current coordinator is. * All locators are contacted. If they don't agree then we choose the oldest @@ -940,8 +906,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } while (!anyResponses && System.currentTimeMillis() < giveUpTime); - - + if (coordinators.isEmpty()) { return false; } @@ -1523,6 +1488,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /*** * test method + * * @return ViewReplyProcessor */ protected ViewReplyProcessor getPrepareViewReplyProcessor() { @@ -1558,6 +1524,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { boolean isWaiting(){ return waiting; } + synchronized void processPendingRequests(Set pendingLeaves, Set pendingRemovals) { // there's no point in waiting for members who have already // requested to leave or who have been declared crashed. @@ -1711,7 +1678,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { services.getMessenger().sendUnreliably(msg); } } - } class ViewCreator extends Thread { @@ -2094,7 +2060,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } logger.debug("unresponsive members that could not be reached: {}", unresponsive); - + List failures = new ArrayList<>(currentView.getCrashedMembers().size() + unresponsive.size()); if (conflictingView != null && !conflictingView.getCreator().equals(localAddress) && conflictingView.getViewId() > newView.getViewId() @@ -2188,10 +2154,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { filterMembers(suspects, newRemovals, REMOVE_MEMBER_REQUEST); filterMembers(suspects, newLeaves, LEAVE_REQUEST_MESSAGE); - newRemovals.removeAll(newLeaves); // if we received a Leave req the member is "healthy" - + newRemovals.removeAll(newLeaves); // if we received a Leave req the member is "healthy" + suspects.removeAll(newLeaves); - + for (InternalDistributedMember mbr : suspects) { if (newRemovals.contains(mbr) || newLeaves.contains(mbr)) { continue; // no need to check this member - it's already been checked or is leaving @@ -2215,7 +2181,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } }); } - + if (checkers.isEmpty()) { logger.debug("all unresponsive members are already scheduled to be removed"); return; @@ -2236,19 +2202,19 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { long giveUpTime = System.currentTimeMillis() + viewAckTimeout; // submit the tasks that will remove dead members from the suspects collection submitAll(svc, checkers); - + // now wait for the tasks to do their work long waitTime = giveUpTime - System.currentTimeMillis(); synchronized (viewRequests) { while ( waitTime > 0 ) { logger.debug("removeHealthyMembers: mbrs" + suspects.size()); - + filterMembers(suspects, newRemovals, REMOVE_MEMBER_REQUEST); filterMembers(suspects, newLeaves, LEAVE_REQUEST_MESSAGE); newRemovals.removeAll(newLeaves); - + suspects.removeAll(newLeaves); - + if(suspects.isEmpty() || newRemovals.containsAll(suspects)) { break; } @@ -2270,7 +2236,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { */ protected void filterMembers(Collection mbrs, Set matchingMembers, short requestType) { Set requests = getPendingRequestIDs(requestType); - + if(!requests.isEmpty()) { logger.debug("filterMembers: processing " + requests.size() + " requests for type " + requestType); Iterator itr = requests.iterator(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/036f2205/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java index 7b263bf..22ac457 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java @@ -82,7 +82,8 @@ public interface DataSerializableFixedID extends SerializationVersions { case FOO: return new FOO(in); */ - + public static final short VIEW_REJECT_MESSAGE = -158; + public static final short NETWORK_PARTITION_MESSAGE = -157; public static final short SUSPECT_MEMBERS_MESSAGE = -156; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/036f2205/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java index 25ce97e..1917692 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java @@ -33,12 +33,7 @@ import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionShortcut; -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.distributed.internal.DistributionException; -import com.gemstone.gemfire.distributed.internal.DistributionManager; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.InternalLocator; -import com.gemstone.gemfire.distributed.internal.MembershipListener; +import com.gemstone.gemfire.distributed.internal.*; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook; @@ -180,8 +175,7 @@ public class LocatorDUnitTest extends DistributedTestCase { service.lock("foo2", 0, 0); } }); - - + // cause elder failover. vm1 will become the lock grantor system.disconnect(); @@ -195,6 +189,7 @@ public class LocatorDUnitTest extends DistributedTestCase { public boolean done() { return service.isLockGrantor(); } + @Override public String description() { return "waiting to become lock grantor after shutting down locator/grantor"; @@ -323,11 +318,11 @@ public class LocatorDUnitTest extends DistributedTestCase { async2.join(); Object result1 = async1.getReturnValue(); if (result1 instanceof Exception) { - throw (Exception)result1; + throw (Exception) result1; } Object result2 = async2.getReturnValue(); if (result2 instanceof Exception) { - throw (Exception)result2; + throw (Exception) result2; } // verify that they found each other SerializableCallable verify = new SerializableCallable("verify no split-brain") { @@ -338,7 +333,7 @@ public class LocatorDUnitTest extends DistributedTestCase { } Assert.assertTrue(sys.getDM().getViewMembers().size() == 2, "expected 2 members but found " + sys.getDM().getViewMembers().size() - ); + ); return true; } }; @@ -359,8 +354,8 @@ public class LocatorDUnitTest extends DistributedTestCase { loc1.invoke(r); } } - } + /** * test lead member selection */ @@ -370,7 +365,7 @@ public class LocatorDUnitTest extends DistributedTestCase { VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); VM vm3 = host.getVM(3); - + port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); DistributedTestUtils.deleteLocatorStateFile(port1); final String locators = NetworkUtils.getServerHostName(host) + "[" + port1 + "]"; @@ -379,69 +374,69 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put("locators", locators); properties.put("enable-network-partition-detection", "true"); properties.put("disable-auto-reconnect", "true"); - + File logFile = new File(""); if (logFile.exists()) { logFile.delete(); } Locator locator = Locator.startLocatorAndDS(port1, logFile, properties); try { - DistributedSystem sys = locator.getDistributedSystem(); - - Object[] connectArgs = new Object[]{ properties }; - - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); + DistributedSystem sys = locator.getDistributedSystem(); + + Object[] connectArgs = new Object[] { properties }; + + SerializableRunnable disconnect = + new SerializableRunnable("Disconnect from " + locators) { + public void run() { + DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); + if (sys != null && sys.isConnected()) { + sys.disconnect(); + } } - } - }; - - assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); - - // connect three vms and then watch the lead member selection as they - // are disconnected/reconnected - properties.put("name", "vm1"); - DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(), - "getDistributedMember", connectArgs); - -// assertTrue(MembershipManagerHelper.getLeadMember(sys) != null); - assertLeadMember(mem1, sys, 5000); - - properties.put("name", "vm2"); - DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(), - "getDistributedMember", connectArgs); - assertLeadMember(mem1, sys, 5000); - - properties.put("name", "vm3"); - DistributedMember mem3 = (DistributedMember)vm3.invoke(this.getClass(), - "getDistributedMember", connectArgs); - assertLeadMember(mem1, sys, 5000); - - // after disconnecting the first vm, the second one should become the leader - vm1.invoke(disconnect); - MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1); - assertLeadMember(mem2, sys, 5000); - - properties.put("name", "vm1"); - mem1 = (DistributedMember)vm1.invoke(this.getClass(), - "getDistributedMember", connectArgs); - assertLeadMember(mem2, sys, 5000); - - vm2.invoke(disconnect); - MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem2); - assertLeadMember(mem3, sys, 5000); - - vm1.invoke(disconnect); - MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1); - assertLeadMember(mem3, sys, 5000); + }; - vm3.invoke(disconnect); - MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem3); - assertLeadMember(null, sys, 5000); + assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); + + // connect three vms and then watch the lead member selection as they + // are disconnected/reconnected + properties.put("name", "vm1"); + DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(), + "getDistributedMember", connectArgs); + + // assertTrue(MembershipManagerHelper.getLeadMember(sys) != null); + assertLeadMember(mem1, sys, 5000); + + properties.put("name", "vm2"); + DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(), + "getDistributedMember", connectArgs); + assertLeadMember(mem1, sys, 5000); + + properties.put("name", "vm3"); + DistributedMember mem3 = (DistributedMember) vm3.invoke(this.getClass(), + "getDistributedMember", connectArgs); + assertLeadMember(mem1, sys, 5000); + + // after disconnecting the first vm, the second one should become the leader + vm1.invoke(disconnect); + MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1); + assertLeadMember(mem2, sys, 5000); + + properties.put("name", "vm1"); + mem1 = (DistributedMember) vm1.invoke(this.getClass(), + "getDistributedMember", connectArgs); + assertLeadMember(mem2, sys, 5000); + + vm2.invoke(disconnect); + MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem2); + assertLeadMember(mem3, sys, 5000); + + vm1.invoke(disconnect); + MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1); + assertLeadMember(mem3, sys, 5000); + + vm3.invoke(disconnect); + MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem3); + assertLeadMember(null, sys, 5000); } finally { locator.stop(); @@ -458,13 +453,14 @@ public class LocatorDUnitTest extends DistributedTestCase { } return (lead == null); } + public String description() { return null; } }; Wait.waitForCriterion(ev, timeout, 200, true); } - + /** * test lead member and coordinator failure with network partition detection * enabled. It would be nice for this test to have more than two "server" @@ -527,44 +523,43 @@ public class LocatorDUnitTest extends DistributedTestCase { } } }); - - Object[] connectArgs = new Object[]{ properties }; - - SerializableRunnable crashLocator = - new SerializableRunnable("Crash locator") { - public void run() { - Locator loc = Locator.getLocators().iterator().next(); - DistributedSystem msys = loc.getDistributedSystem(); - MembershipManagerHelper.crashDistributedSystem(msys); - loc.stop(); - } - }; + Object[] connectArgs = new Object[] { properties }; + + SerializableRunnable crashLocator = + new SerializableRunnable("Crash locator") { + public void run() { + Locator loc = Locator.getLocators().iterator().next(); + DistributedSystem msys = loc.getDistributedSystem(); + MembershipManagerHelper.crashDistributedSystem(msys); + loc.stop(); + } + }; assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); - -// properties.put("log-level", getDUnitLogLevel()); - - DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(), + + // properties.put("log-level", getDUnitLogLevel()); + + DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(), "getDistributedMember", connectArgs); vm2.invoke(this.getClass(), "getDistributedMember", connectArgs); assertLeadMember(mem1, sys, 5000); - + assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys)); - + // crash the second vm and the locator. Should be okay DistributedTestUtils.crashDistributedSystem(vm2); locvm.invoke(crashLocator); - + assertTrue("Distributed system should not have disconnected", vm1.invoke(() -> LocatorDUnitTest.isSystemConnected())); // ensure quorumLost is properly invoked - DistributionManager dm = (DistributionManager)((InternalDistributedSystem)sys).getDistributionManager(); + DistributionManager dm = (DistributionManager) ((InternalDistributedSystem) sys).getDistributionManager(); MyMembershipListener listener = new MyMembershipListener(); dm.addMembershipListener(listener); - + // disconnect the first vm and demonstrate that the third vm and the // locator notice the failure and exit DistributedTestUtils.crashDistributedSystem(vm1); @@ -579,6 +574,7 @@ public class LocatorDUnitTest extends DistributedTestCase { public boolean done() { return !sys.isConnected(); } + public String description() { return null; } @@ -590,20 +586,18 @@ public class LocatorDUnitTest extends DistributedTestCase { // quorumLost should be invoked if we get a ForcedDisconnect in this situation assertTrue("expected quorumLost to be invoked", listener.quorumLostInvoked); assertTrue("expected suspect processing initiated by TCPConduit", listener.suspectReasons.contains(Connection.INITIATING_SUSPECT_PROCESSING)); - } - finally { + } finally { if (locator != null) { locator.stop(); } LogWriter bLogger = - new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); + new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); bLogger.info("service failure"); bLogger.info("java.net.ConnectException"); bLogger.info("com.gemstone.gemfire.ForcedDisconnectException"); disconnectAllFromDS(); } } - /** * test lead member failure and normal coordinator shutdown with network partition detection @@ -662,31 +656,31 @@ public class LocatorDUnitTest extends DistributedTestCase { } } }); - - Object[] connectArgs = new Object[]{ properties }; - + + Object[] connectArgs = new Object[] { properties }; + SerializableRunnable crashSystem = - new SerializableRunnable("Crash system") { - public void run() { - DistributedSystem msys = InternalDistributedSystem.getAnyInstance(); - msys.getLogWriter().info("service failure"); - msys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); - msys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); - MembershipManagerHelper.crashDistributedSystem(msys); - } - }; + new SerializableRunnable("Crash system") { + public void run() { + DistributedSystem msys = InternalDistributedSystem.getAnyInstance(); + msys.getLogWriter().info("service failure"); + msys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); + msys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); + MembershipManagerHelper.crashDistributedSystem(msys); + } + }; assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); - - DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(), + + DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(), "getDistributedMember", connectArgs); - DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(), + DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(), "getDistributedMember", connectArgs); assertEquals(mem1, MembershipManagerHelper.getLeadMember(sys)); - + assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys)); - + MembershipManagerHelper.inhibitForcedDisconnectLogging(true); // crash the lead vm. Should be okay @@ -696,7 +690,7 @@ public class LocatorDUnitTest extends DistributedTestCase { assertTrue("Distributed system should not have disconnected", isSystemConnected()); - + assertTrue("Distributed system should not have disconnected", vm2.invoke(() -> LocatorDUnitTest.isSystemConnected())); @@ -705,13 +699,13 @@ public class LocatorDUnitTest extends DistributedTestCase { // stop the locator normally. This should also be okay locator.stop(); - + if (!Locator.getLocators().isEmpty()) { // log this for debugging purposes before throwing assertion error com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().warning("found locator " + Locator.getLocators().iterator().next()); } assertTrue("locator is not stopped", Locator.getLocators().isEmpty()); - + assertTrue("Distributed system should not have disconnected", vm2.invoke(() -> LocatorDUnitTest.isSystemConnected())); @@ -724,7 +718,7 @@ public class LocatorDUnitTest extends DistributedTestCase { mem2, vm2.invoke(() -> LocatorDUnitTest.getLeadMember())); SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { + new SerializableRunnable("Disconnect from " + locators) { public void run() { DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); if (sys != null && sys.isConnected()) { @@ -737,8 +731,7 @@ public class LocatorDUnitTest extends DistributedTestCase { // locator notice the failure and exit vm2.invoke(disconnect); locvm.invoke(stopLocator); - } - finally { + } finally { MembershipManagerHelper.inhibitForcedDisconnectLogging(false); if (locator != null) { locator.stop(); @@ -792,7 +785,7 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); SerializableRunnable stopLocator = getStopLocatorRunnable(); - + try { final String uname = getUniqueName(); File logFile = new File(""); @@ -803,51 +796,50 @@ public class LocatorDUnitTest extends DistributedTestCase { File lf = new File(""); try { Locator.startLocatorAndDS(port2, lf, properties); - } - catch (IOException ios) { + } catch (IOException ios) { com.gemstone.gemfire.test.dunit.Assert.fail("Unable to start locator2", ios); } } }); - - Object[] connectArgs = new Object[]{ properties }; - + + Object[] connectArgs = new Object[] { properties }; + SerializableRunnable crashSystem = - new SerializableRunnable("Crash system") { - public void run() { - DistributedSystem msys = InternalDistributedSystem.getAnyInstance(); - msys.getLogWriter().info("service failure"); - msys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); - msys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); - msys.getLogWriter().info("Possible loss of quorum"); - hook = new TestHook(); - MembershipManagerHelper.getMembershipManager(msys).registerTestHook(hook); - try { - MembershipManagerHelper.crashDistributedSystem(msys); - } finally { - hook.reset(); + new SerializableRunnable("Crash system") { + public void run() { + DistributedSystem msys = InternalDistributedSystem.getAnyInstance(); + msys.getLogWriter().info("service failure"); + msys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); + msys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); + msys.getLogWriter().info("Possible loss of quorum"); + hook = new TestHook(); + MembershipManagerHelper.getMembershipManager(msys).registerTestHook(hook); + try { + MembershipManagerHelper.crashDistributedSystem(msys); + } finally { + hook.reset(); + } } - } - }; + }; assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); - - final DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(), + + final DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(), "getDistributedMember", connectArgs); - final DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(), + final DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(), "getDistributedMember", connectArgs); assertEquals(mem1, MembershipManagerHelper.getLeadMember(sys)); - + assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys)); - + // crash the lead vm. Should be okay. it should hang in test hook thats // why call is asynchronous. //vm1.invokeAsync(crashSystem); assertTrue("Distributed system should not have disconnected", isSystemConnected()); - + assertTrue("Distributed system should not have disconnected", vm2.invoke(() -> LocatorDUnitTest.isSystemConnected())); @@ -860,12 +852,12 @@ public class LocatorDUnitTest extends DistributedTestCase { // request member removal for first peer from second peer. vm2.invoke(new SerializableRunnable("Request Member Removal") { - + @Override public void run() { DistributedSystem msys = InternalDistributedSystem.getAnyInstance(); MembershipManager mmgr = MembershipManagerHelper.getMembershipManager(msys); - + // check for shutdown cause in MembershipManager. Following call should // throw DistributedSystemDisconnectedException which should have cause as // ForceDisconnectException. @@ -885,8 +877,7 @@ public class LocatorDUnitTest extends DistributedTestCase { } }); - } - finally { + } finally { if (locator != null) { locator.stop(); } @@ -930,7 +921,7 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { + new SerializableRunnable("Disconnect from " + locators) { public void run() { DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); if (sys != null && sys.isConnected()) { @@ -940,11 +931,11 @@ public class LocatorDUnitTest extends DistributedTestCase { } }; SerializableRunnable expectedException = - new SerializableRunnable("Add expected exceptions") { - public void run() { - MembershipManagerHelper.inhibitForcedDisconnectLogging(true); - } - }; + new SerializableRunnable("Add expected exceptions") { + public void run() { + MembershipManagerHelper.inhibitForcedDisconnectLogging(true); + } + }; try { final String uname = getUniqueName(); locvm.invoke(new SerializableRunnable() { @@ -952,8 +943,7 @@ public class LocatorDUnitTest extends DistributedTestCase { File lf = new File(""); try { Locator.startLocatorAndDS(port2, lf, properties); - } - catch (IOException ios) { + } catch (IOException ios) { com.gemstone.gemfire.test.dunit.Assert.fail("Unable to start locator1", ios); } } @@ -963,42 +953,42 @@ public class LocatorDUnitTest extends DistributedTestCase { locator = Locator.startLocatorAndDS(port1, logFile, properties); DistributedSystem sys = locator.getDistributedSystem(); sys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); - Object[] connectArgs = new Object[]{ properties }; - + Object[] connectArgs = new Object[] { properties }; + SerializableRunnable crashLocator = - new SerializableRunnable("Crash locator") { - public void run() { - Locator loc = Locator.getLocators().iterator().next(); - DistributedSystem msys = loc.getDistributedSystem(); - msys.getLogWriter().info("service failure"); - msys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); - msys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); - MembershipManagerHelper.crashDistributedSystem(msys); - loc.stop(); - } - }; + new SerializableRunnable("Crash locator") { + public void run() { + Locator loc = Locator.getLocators().iterator().next(); + DistributedSystem msys = loc.getDistributedSystem(); + msys.getLogWriter().info("service failure"); + msys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); + msys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); + MembershipManagerHelper.crashDistributedSystem(msys); + loc.stop(); + } + }; assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); - - DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(), + + DistributedMember mem1 = (DistributedMember) vm1.invoke(this.getClass(), "getDistributedMember", connectArgs); vm1.invoke(expectedException); - DistributedMember mem2 = (DistributedMember)vm2.invoke(this.getClass(), + DistributedMember mem2 = (DistributedMember) vm2.invoke(this.getClass(), "getDistributedMember", connectArgs); DistributedMember loc1Mbr = (DistributedMember)locvm.invoke(() -> this.getLocatorDistributedMember()); assertLeadMember(mem1, sys, 5000); - + assertEquals(loc1Mbr, MembershipManagerHelper.getCoordinator(sys)); - + // crash the lead locator. Should be okay locvm.invoke(crashLocator); Wait.pause(10 * 1000); assertTrue("Distributed system should not have disconnected", sys.isConnected()); - + assertTrue("Distributed system should not have disconnected", vm1.invoke(() -> LocatorDUnitTest.isSystemConnected())); @@ -1016,11 +1006,10 @@ public class LocatorDUnitTest extends DistributedTestCase { assertEquals(sys.getDistributedMember(), MembershipManagerHelper.getCoordinator(sys)); assertEquals(mem2, MembershipManagerHelper.getLeadMember(sys)); - - } - finally { + + } finally { vm2.invoke(disconnect); - + if (locator != null) { locator.stop(); } @@ -1044,17 +1033,17 @@ public class LocatorDUnitTest extends DistributedTestCase { props.setProperty("locators", locators); final String expected = "java.net.ConnectException"; - final String addExpected = - "" + expected + ""; - final String removeExpected = - "" + expected + ""; - + final String addExpected = + "" + expected + ""; + final String removeExpected = + "" + expected + ""; + LogWriter bgexecLogger = - new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); + new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); bgexecLogger.info(addExpected); - + boolean exceptionOccurred = true; - String oldValue = (String)System.getProperties().put("p2p.joinTimeout", "15000"); + String oldValue = (String) System.getProperties().put("p2p.joinTimeout", "15000"); try { DistributedSystem.connect(props); exceptionOccurred = false; @@ -1065,7 +1054,7 @@ public class LocatorDUnitTest extends DistributedTestCase { } catch (GemFireConfigException ex) { String s = ex.getMessage(); assertTrue(s.indexOf("Locator does not exist") >= 0); - + } catch (Exception ex) { // if you see this fail, determine if unexpected exception is expected // if expected then add in a catch block for it above this catch @@ -1105,14 +1094,14 @@ public class LocatorDUnitTest extends DistributedTestCase { DistributedTestUtils.deleteLocatorStateFile(port1); final String locators = NetworkUtils.getServerHostName(host) + "[" + port + "]"; final String uniqueName = getUniqueName(); - + vm0.invoke(new SerializableRunnable("Start locator " + locators) { - public void run() { - File logFile = new File(""); - try { - Properties locProps = new Properties(); - locProps.setProperty("mcast-port", "0"); - locProps.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + public void run() { + File logFile = new File(""); + try { + Properties locProps = new Properties(); + locProps.setProperty("mcast-port", "0"); + locProps.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); Locator.startLocatorAndDS(port, logFile, locProps); } catch (IOException ex) { @@ -1122,22 +1111,22 @@ public class LocatorDUnitTest extends DistributedTestCase { }); try { - SerializableRunnable connect = - new SerializableRunnable("Connect to " + locators) { - public void run() { - //System.setProperty("p2p.joinTimeout", "5000"); - Properties props = new Properties(); - props.setProperty("mcast-port", "0"); - props.setProperty("locators", locators); - DistributedSystem.connect(props); - } - }; - vm1.invoke(connect); - vm2.invoke(connect); + SerializableRunnable connect = + new SerializableRunnable("Connect to " + locators) { + public void run() { + //System.setProperty("p2p.joinTimeout", "5000"); + Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); + DistributedSystem.connect(props); + } + }; + vm1.invoke(connect); + vm2.invoke(connect); - Properties props = new Properties(); - props.setProperty("mcast-port", "0"); - props.setProperty("locators", locators); + Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); system = (InternalDistributedSystem)DistributedSystem.connect(props); @@ -1184,14 +1173,6 @@ public class LocatorDUnitTest extends DistributedTestCase { } } -// public void testRepeat() throws Exception { -// for (int i=0; i<10; i++) { -// System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run #"+i); -// testLocatorBecomesCoordinator(); -// tearDown(); -// setUp(); -// } -// } /** * Tests starting one locator in a remote VM and having multiple * members of the distributed system join it. This ensures that @@ -1202,10 +1183,10 @@ public class LocatorDUnitTest extends DistributedTestCase { public void testLocatorBecomesCoordinator() throws Exception { disconnectAllFromDS(); final String expected = "java.net.ConnectException"; - final String addExpected = - "" + expected + ""; - final String removeExpected = - "" + expected + ""; + final String addExpected = + "" + expected + ""; + final String removeExpected = + "" + expected + ""; Host host = Host.getHost(0); VM vm0 = host.getVM(0); @@ -1220,23 +1201,23 @@ public class LocatorDUnitTest extends DistributedTestCase { vm0.invoke(getStartSBLocatorRunnable(port, getUniqueName()+"1")); try { - final Properties props = new Properties(); - props.setProperty("mcast-port", "0"); - props.setProperty("locators", locators); - props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + final Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); + props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - SerializableRunnable connect = - new SerializableRunnable("Connect to " + locators) { - public void run() { - //System.setProperty("p2p.joinTimeout", "5000"); - DistributedSystem sys = getSystem(props); - sys.getLogWriter().info(addExpected); - } - }; - vm1.invoke(connect); - vm2.invoke(connect); + SerializableRunnable connect = + new SerializableRunnable("Connect to " + locators) { + public void run() { + //System.setProperty("p2p.joinTimeout", "5000"); + DistributedSystem sys = getSystem(props); + sys.getLogWriter().info(addExpected); + } + }; + vm1.invoke(connect); + vm2.invoke(connect); - system = (InternalDistributedSystem)getSystem(props); + system = (InternalDistributedSystem) getSystem(props); final DistributedMember coord = MembershipManagerHelper.getCoordinator(system); com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("coordinator before termination of locator is " + coord); @@ -1254,7 +1235,7 @@ public class LocatorDUnitTest extends DistributedTestCase { } }; Wait.waitForCriterion(ev, 15000, 200, true); - DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system); + DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system); com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("coordinator after shutdown of locator was " + newCoord); if (newCoord == null || coord.equals(newCoord)) { @@ -1284,23 +1265,23 @@ public class LocatorDUnitTest extends DistributedTestCase { new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); bgexecLogger.info(removeExpected); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); + SerializableRunnable disconnect = + new SerializableRunnable("Disconnect from " + locators) { + public void run() { + DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); + if (sys != null && sys.isConnected()) { + sys.disconnect(); + } + // connectExceptions occur during disconnect, so we need the + // expectedexception hint to be in effect until this point + LogWriter bLogger = + new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); + bLogger.info(removeExpected); } - // connectExceptions occur during disconnect, so we need the - // expectedexception hint to be in effect until this point - LogWriter bLogger = - new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out); - bLogger.info(removeExpected); - } - }; - vm1.invoke(disconnect); - vm2.invoke(disconnect); - vm0.invoke(getStopLocatorRunnable()); + }; + vm1.invoke(disconnect); + vm2.invoke(disconnect); + vm0.invoke(getStopLocatorRunnable()); } finally { vm0.invoke(getStopLocatorRunnable()); } @@ -1317,7 +1298,7 @@ public class LocatorDUnitTest extends DistributedTestCase { public static void resetRefreshWait() { System.getProperties().remove("p2p.gossipRefreshRate"); } - + public static boolean isSystemConnected() { DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); if (sys != null && sys.isConnected()) { @@ -1325,12 +1306,10 @@ public class LocatorDUnitTest extends DistributedTestCase { } return false; } - static boolean beforeFailureNotificationReceived; static boolean afterFailureNotificationReceived; - /** * Tests starting multiple locators in multiple VMs. */ @@ -1348,15 +1327,14 @@ public class LocatorDUnitTest extends DistributedTestCase { final int port2 = freeTCPPorts[1]; this.port2 = port2; DistributedTestUtils.deleteLocatorStateFile(port1, port2); - final String host0 = NetworkUtils.getServerHostName(host); + final String host0 = NetworkUtils.getServerHostName(host); final String locators = host0 + "[" + port1 + "]," + - host0 + "[" + port2 + "]"; + host0 + "[" + port2 + "]"; final Properties dsProps = new Properties(); dsProps.setProperty("locators", locators); dsProps.setProperty("mcast-port", "0"); dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); -// dsProps.setProperty("log-level", "finest"); final String uniqueName = getUniqueName(); vm0.invoke(new SerializableRunnable("Start locator on " + port1) { @@ -1387,14 +1365,14 @@ public class LocatorDUnitTest extends DistributedTestCase { try { SerializableRunnable connect = - new SerializableRunnable("Connect to " + locators) { - public void run() { - Properties props = new Properties(); - props.setProperty("mcast-port", "0"); - props.setProperty("locators", locators); - DistributedSystem.connect(props); - } - }; + new SerializableRunnable("Connect to " + locators) { + public void run() { + Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); + DistributedSystem.connect(props); + } + }; vm1.invoke(connect); vm2.invoke(connect); @@ -1402,19 +1380,19 @@ public class LocatorDUnitTest extends DistributedTestCase { props.setProperty("mcast-port", "0"); props.setProperty("locators", locators); - system = (InternalDistributedSystem)DistributedSystem.connect(props); + system = (InternalDistributedSystem) DistributedSystem.connect(props); WaitCriterion ev = new WaitCriterion() { public boolean done() { try { return system.getDM().getViewMembers().size() >= 3; - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail("unexpected exception"); } return false; // NOTREACHED } + public String description() { return null; } @@ -1427,14 +1405,14 @@ public class LocatorDUnitTest extends DistributedTestCase { system.disconnect(); SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; + new SerializableRunnable("Disconnect from " + locators) { + public void run() { + DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); + if (sys != null && sys.isConnected()) { + sys.disconnect(); + } + } + }; vm1.invoke(disconnect); vm2.invoke(disconnect); @@ -1447,6 +1425,158 @@ public class LocatorDUnitTest extends DistributedTestCase { } /** + * Tests starting multiple locators at the same time and ensuring that the locators + * end up only have 1 master. + * GEODE-870 + */ + public void testMultipleLocatorsRestartingAtSameTime() throws Exception { + disconnectAllFromDS(); + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + VM vm3 = host.getVM(3); + VM vm4 = host.getVM(4); + + int[] freeTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3); + this.port1 = freeTCPPorts[0]; + this.port2 = freeTCPPorts[1]; + int port3 = freeTCPPorts[2]; + deleteLocatorStateFile(port1, port2, port3); + final String host0 = getServerHostName(host); + final String locators = host0 + "[" + port1 + "]," + + host0 + "[" + port2 + "]," + + host0 + "[" + port3 + "]"; + + final Properties dsProps = new Properties(); + dsProps.setProperty("locators", locators); + dsProps.setProperty("mcast-port", "0"); + dsProps.setProperty("log-level", "FINE"); + dsProps.setProperty("enable-network-partition-detection", "true"); + dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + final String uniqueName = getUniqueName(); + + startLocatorSync(vm0, new Object[] { port1, dsProps }); + startLocatorSync(vm1, new Object[] { port2, dsProps }); + startLocatorSync(vm2, new Object[] { port3, dsProps }); + try { + try { + + SerializableRunnable connect = + new SerializableRunnable("Connect to " + locators) { + public void run() { + Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); + props.setProperty("log-level", "FINE"); + props.setProperty("enable-network-partition-detection", "true"); + DistributedSystem.connect(props); + } + }; + vm3.invoke(connect); + vm4.invoke(connect); + + Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); + + system = (InternalDistributedSystem) DistributedSystem.connect(props); + + WaitCriterion waitCriterion = new WaitCriterion() { + public boolean done() { + try { + return system.getDM().getViewMembers().size() >= 3; + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception"); + } + return false; // NOTREACHED + } + + public String description() { + return null; + } + }; + DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true); + + // three applications plus + assertEquals(6, system.getDM().getViewMembers().size()); + + vm0.invoke(getStopLocatorRunnable()); + vm1.invoke(getStopLocatorRunnable()); + vm2.invoke(getStopLocatorRunnable()); + + final String newLocators = host0 + "[" + port2 + "]," + + host0 + "[" + port3 + "]"; + dsProps.setProperty("locators", newLocators); + + startLocatorAsync(vm1, new Object[] { port2, dsProps }); + startLocatorAsync(vm2, new Object[] { port3, dsProps }); + + waitCriterion = new WaitCriterion() { + public boolean done() { + try { + return system.getDM().getAllHostedLocators().size() == 2; + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception"); + } + return false; // NOTREACHED + } + + public String description() { + return null; + } + }; + DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true); + + } finally { + system.disconnect(); + SerializableRunnable disconnect = + new SerializableRunnable("Disconnect from " + locators) { + public void run() { + DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); + if (sys != null && sys.isConnected()) { + sys.disconnect(); + } + } + }; + vm3.invoke(disconnect); + vm4.invoke(disconnect); + vm2.invoke(getStopLocatorRunnable()); + vm1.invoke(getStopLocatorRunnable()); + } + } finally { + } + } + + private void startLocatorSync(VM vm, Object[] args) { + vm.invoke(new SerializableRunnable("Starting process on " + args[0], args) { + public void run() { + File logFile = new File(""); + try { + Locator.startLocatorAndDS((int) args[0], logFile, (Properties) args[1]); + } catch (IOException ex) { + fail("While starting process on port " + args[0], ex); + } + } + }); + } + + private void startLocatorAsync(VM vm, Object[] args) { + vm.invokeAsync(new SerializableRunnable("Starting process on " + args[0], args) { + public void run() { + File logFile = new File(""); + try { + Locator.startLocatorAndDS((int) args[0], logFile, (Properties) args[1]); + } catch (IOException ex) { + fail("While starting process on port " + args[0], ex); + } + } + }); + } + + /** * Tests starting multiple locators in multiple VMs. */ public void testMultipleMcastLocators() throws Exception { @@ -1466,11 +1596,11 @@ public class LocatorDUnitTest extends DistributedTestCase { DistributedTestUtils.deleteLocatorStateFile(port1, port2); final int mcastport = AvailablePort.getRandomAvailablePort(AvailablePort.MULTICAST); - final String host0 = NetworkUtils.getServerHostName(host); + final String host0 = NetworkUtils.getServerHostName(host); final String locators = host0 + "[" + port1 + "]," + - host0 + "[" + port2 + "]"; + host0 + "[" + port2 + "]"; final String uniqueName = getUniqueName(); - + vm0.invoke(new SerializableRunnable("Start locator on " + port1) { public void run() { File logFile = new File(""); @@ -1510,7 +1640,7 @@ public class LocatorDUnitTest extends DistributedTestCase { }); SerializableRunnable connect = - new SerializableRunnable("Connect to " + locators) { + new SerializableRunnable("Connect to " + locators) { public void run() { Properties props = new Properties(); props.setProperty("mcast-port", String.valueOf(mcastport)); @@ -1532,17 +1662,17 @@ public class LocatorDUnitTest extends DistributedTestCase { props.setProperty("mcast-ttl", "0"); props.setProperty("enable-network-partition-detection", "true"); - system = (InternalDistributedSystem)DistributedSystem.connect(props); + system = (InternalDistributedSystem) DistributedSystem.connect(props); WaitCriterion ev = new WaitCriterion() { public boolean done() { try { return system.getDM().getViewMembers().size() == 5; - } - catch (Exception e) { + } catch (Exception e) { com.gemstone.gemfire.test.dunit.Assert.fail("unexpected exception", e); } return false; // NOTREACHED } + public String description() { return "waiting for 5 members - have " + system.getDM().getViewMembers().size(); } @@ -1551,7 +1681,7 @@ public class LocatorDUnitTest extends DistributedTestCase { system.disconnect(); SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { + new SerializableRunnable("Disconnect from " + locators) { public void run() { DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); if (sys != null && sys.isConnected()) { @@ -1561,8 +1691,7 @@ public class LocatorDUnitTest extends DistributedTestCase { }; vm1.invoke(disconnect); vm2.invoke(disconnect); - } - finally { + } finally { SerializableRunnable stop = getStopLocatorRunnable(); vm0.invoke(stop); vm3.invoke(stop); @@ -1572,7 +1701,6 @@ public class LocatorDUnitTest extends DistributedTestCase { } } - /** * Tests that a VM can connect to a locator that is hosted in its * own VM. @@ -1590,14 +1718,14 @@ public class LocatorDUnitTest extends DistributedTestCase { final String locators = NetworkUtils.getServerHostName(host) + "[" + port1 + "]"; - Properties props = new Properties(); - props.setProperty("mcast-port", "0"); - props.setProperty("locators", locators); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - system = (InternalDistributedSystem)DistributedSystem.connect(props); - system.disconnect(); + Properties props = new Properties(); + props.setProperty("mcast-port", "0"); + props.setProperty("locators", locators); + props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + system = (InternalDistributedSystem) DistributedSystem.connect(props); + system.disconnect(); } finally { - locator.stop(); + locator.stop(); } } @@ -1610,7 +1738,7 @@ public class LocatorDUnitTest extends DistributedTestCase { Host host = Host.getHost(0); VM vm1 = host.getVM(1); Locator locator = null; - + try { port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); DistributedTestUtils.deleteLocatorStateFile(port1); @@ -1623,37 +1751,36 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); File logFile = new File(""); locator = Locator.startLocatorAndDS(port1, logFile, properties); - + final Properties properties2 = new Properties(); properties2.put("mcast-port", "0"); properties2.put("locators", locators); properties2.put(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "false"); properties2.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); properties2.put("disable-auto-reconnect", "true"); - + vm1.invoke(new SerializableRunnable("try to connect") { public void run() { DistributedSystem s = null; try { s = DistributedSystem.connect(properties2); - boolean enabled = ((InternalDistributedSystem)s).getConfig().getEnableNetworkPartitionDetection(); + boolean enabled = ((InternalDistributedSystem) s).getConfig().getEnableNetworkPartitionDetection(); s.disconnect(); if (!enabled) { fail("should not have been able to connect with different enable-network-partition-detection settings"); } - } - catch (GemFireConfigException e) { + } catch (GemFireConfigException e) { fail("should have been able to connect and have enable-network-partion-detection enabled"); } } }); - + locator.stop(); - + // now start the locator with enable-network-partition-detection=false logFile = new File(""); - locator = Locator.startLocatorAndDS(port1, logFile , properties2); - + locator = Locator.startLocatorAndDS(port1, logFile, properties2); + vm1.invoke(new SerializableRunnable("try to connect") { public void run() { DistributedSystem s = null; @@ -1661,17 +1788,15 @@ public class LocatorDUnitTest extends DistributedTestCase { s = DistributedSystem.connect(properties); s.disconnect(); fail("should not have been able to connect with different enable-network-partition-detection settings"); - } - catch (GemFireConfigException e) { + } catch (GemFireConfigException e) { // passed } } }); - + locator.stop(); locator = null; - } - finally { + } finally { if (locator != null) { locator.stop(); } @@ -1688,16 +1813,16 @@ public class LocatorDUnitTest extends DistributedTestCase { //VM vm1 = host.getVM(1); port1 = - AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); File logFile1 = new File(""); DistributedTestUtils.deleteLocatorStateFile(port1); Locator locator1 = Locator.startLocator(port1, logFile1); - + try { - int port2 = - AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - File logFile2 = new File(""); + int port2 = + AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + File logFile2 = new File(""); DistributedTestUtils.deleteLocatorStateFile(port2); @@ -1724,21 +1849,21 @@ public class LocatorDUnitTest extends DistributedTestCase { connect.run(); //vm1.invoke(connect); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); + SerializableRunnable disconnect = + new SerializableRunnable("Disconnect from " + locators) { + public void run() { + DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); + if (sys != null && sys.isConnected()) { + sys.disconnect(); + } } - } - }; + }; - disconnect.run(); - //vm1.invoke(disconnect); + disconnect.run(); + //vm1.invoke(disconnect); } finally { - locator1.stop(); + locator1.stop(); } } @@ -1754,10 +1879,10 @@ public class LocatorDUnitTest extends DistributedTestCase { AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); DistributedTestUtils.deleteLocatorStateFile(port1); File logFile = new File(""); - File stateFile = new File("locator"+port1+"state.dat"); + File stateFile = new File("locator" + port1 + "state.dat"); VM vm0 = Host.getHost(0).getVM(0); final Properties p = new Properties(); - p.setProperty(DistributionConfig.LOCATORS_NAME, Host.getHost(0).getHostName() + "["+port1+"]"); + p.setProperty(DistributionConfig.LOCATORS_NAME, Host.getHost(0).getHostName() + "[" + port1 + "]"); p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); p.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); if (stateFile.exists()) { @@ -1778,7 +1903,7 @@ public class LocatorDUnitTest extends DistributedTestCase { com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Stopping locator"); locator.stop(); - + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Starting locator"); locator = Locator.startLocatorAndDS(port1, logFile, p); @@ -1789,14 +1914,14 @@ public class LocatorDUnitTest extends DistributedTestCase { }); } finally { - locator.stop(); + locator.stop(); } } /** return the distributed member id for the ds on this vm */ public static DistributedMember getDistributedMember(Properties props) { - props.put("name", "vm_"+VM.getCurrentVMNum()); + props.put("name", "vm_" + VM.getCurrentVMNum()); DistributedSystem sys = DistributedSystem.connect(props); sys.getLogWriter().info("service failure"); sys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); @@ -1807,7 +1932,7 @@ public class LocatorDUnitTest extends DistributedTestCase { /** find a running locator and return its distributed member id */ public static DistributedMember getLocatorDistributedMember() { return (Locator.getLocators().iterator().next()) - .getDistributedSystem().getDistributedMember(); + .getDistributedSystem().getDistributedMember(); } /** find the lead member and return its id */ @@ -1828,7 +1953,7 @@ public class LocatorDUnitTest extends DistributedTestCase { } }; } - + private SerializableRunnable getStartSBLocatorRunnable(final int port, final String name) { return new SerializableRunnable("Start locator on port " + port) { public void run() { @@ -1850,26 +1975,25 @@ public class LocatorDUnitTest extends DistributedTestCase { } }; } - + protected void nukeJChannel(DistributedSystem sys) { sys.getLogWriter().info("service failure"); sys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); sys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); try { MembershipManagerHelper.crashDistributedSystem(sys); - } - catch (DistributedSystemDisconnectedException se) { + } catch (DistributedSystemDisconnectedException se) { // it's okay for the system to already be shut down } sys.getLogWriter().info("service failure"); sys.getLogWriter().info("com.gemstone.gemfire.ForcedDisconnectException"); } - //New test hook which blocks before closing channel. class TestHook implements MembershipTestHook { volatile boolean unboundedWait = true; + @Override public void beforeMembershipFailure(String reason, Throwable cause) { System.out.println("Inside TestHook.beforeMembershipFailure with " + cause); @@ -1892,22 +2016,26 @@ public class LocatorDUnitTest extends DistributedTestCase { } } + class MyMembershipListener implements MembershipListener { boolean quorumLostInvoked; List suspectReasons = new ArrayList<>(50); - - public void memberJoined(InternalDistributedMember id) { } - public void memberDeparted(InternalDistributedMember id, boolean crashed) { } + + public void memberJoined(InternalDistributedMember id) { + } + + public void memberDeparted(InternalDistributedMember id, boolean crashed) { + } + public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected, String reason) { suspectReasons.add(reason); } + public void quorumLost(Set failures, List remaining) { quorumLostInvoked = true; com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("quorumLost invoked in test code"); } } - - } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/036f2205/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java index 353cdc7..4caf815 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java @@ -49,8 +49,9 @@ import java.io.Serializable; public abstract class SerializableRunnable implements SerializableRunnableIF { private static final long serialVersionUID = 7584289978241650456L; - + private String name; + private Object[] args; public SerializableRunnable() { this.name = null; @@ -70,11 +71,16 @@ public abstract class SerializableRunnable implements SerializableRunnableIF { public SerializableRunnable(String name) { this.name = name; } - + + public SerializableRunnable(String name, Object[] args) { + this.name = name; + this.args = args; + } + public void setName(String newName) { this.name = newName; } - + public String getName() { return this.name; }