Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 58DFB180B8 for ; Mon, 21 Dec 2015 18:04:39 +0000 (UTC) Received: (qmail 42170 invoked by uid 500); 21 Dec 2015 18:04:39 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 42071 invoked by uid 500); 21 Dec 2015 18:04:39 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 42030 invoked by uid 99); 21 Dec 2015 18:04:39 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2015 18:04:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A6E40C98BB for ; Mon, 21 Dec 2015 18:04:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.78 X-Spam-Level: * X-Spam-Status: No, score=1.78 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id jC-0vrmaOi-H for ; Mon, 21 Dec 2015 18:04:32 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8B1F1439F5 for ; Mon, 21 Dec 2015 18:04:30 +0000 (UTC) Received: (qmail 41184 invoked by uid 99); 21 Dec 2015 18:04:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2015 18:04:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0915E07F8; Mon, 21 Dec 2015 18:04:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jensdeppe@apache.org To: commits@geode.incubator.apache.org Date: Mon, 21 Dec 2015 18:04:34 -0000 Message-Id: In-Reply-To: <09ae6dc62ac24965ba79507b5b76b548@git.apache.org> References: <09ae6dc62ac24965ba79507b5b76b548@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/50] [abbrv] incubator-geode git commit: StateFlushOperation flushing/waiting for multicast messages StateFlushOperation flushing/waiting for multicast messages As part of the State Flush algorithm we record the state of the multicast protocol in JGroups and send it to the initial image provider. There the information is used to wait for the on-wire multicast messages to be received. This change set also includes additional JGroupsMessenger tests for better code coverage and fixes a few Find Bugs problems in that class and GMSJoinLeave. One of these required replacing a volatile long with an AtomicLong because the long was being incremented, which is not necessarily an atomic operation on a volatile variable. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2f0c7fcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2f0c7fcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2f0c7fcb Branch: refs/heads/feature/GEODE-14 Commit: 2f0c7fcb5bd46ecc37af9341f43e8aa3521048e3 Parents: eb685b4 Author: Bruce Schuchardt Authored: Fri Dec 11 15:06:46 2015 -0800 Committer: Bruce Schuchardt Committed: Fri Dec 11 15:09:08 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectChannel.java | 2 +- .../internal/membership/MembershipManager.java | 18 ++--- .../membership/gms/interfaces/Messenger.java | 22 +++++++ .../membership/gms/membership/GMSJoinLeave.java | 10 +-- .../gms/messenger/JGroupsMessenger.java | 69 +++++++++++++++----- .../gms/mgr/GMSMembershipManager.java | 20 ++---- .../internal/cache/StateFlushOperation.java | 6 +- .../gemfire/internal/tcp/ConnectionTable.java | 2 +- .../gemfire/internal/tcp/TCPConduit.java | 2 +- .../messenger/JGroupsMessengerJUnitTest.java | 64 +++++++++++++++++- 10 files changed, 165 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java index f84813e..14ff923 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java @@ -962,7 +962,7 @@ public class DirectChannel { * the map to add the state to * @since 5.1 */ - public void getChannelStates(Stub member, HashMap result) + public void getChannelStates(Stub member, Map result) { TCPConduit tc = this.conduit; if (tc != null) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java index 54b82a7..a46680b 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java @@ -177,30 +177,30 @@ public interface MembershipManager { public boolean shutdownInProgress(); /** - * Returns a serializable map of communication channel state for + * Returns a serializable map of communications state for * use in state stabilization. * @param member - * the member whose channel state is to be captured + * the member whose message state is to be captured * @param includeMulticast - * whether the state of the mcast channel should be included + * whether the state of the mcast messaging should be included * @return the current state of the communication channels between this * process and the given distributed member * @since 5.1 */ - public Map getChannelStates(DistributedMember member, boolean includeMulticast); + public Map getMessageState(DistributedMember member, boolean includeMulticast); /** - * Waits for the given communication channels to reach the associated + * Waits for the given communications to reach the associated * state * @param member - * The member whose channel state we're waiting for - * @param channelState - * The channel states to wait for. This should come from getChannelStates + * The member whose messaging state we're waiting for + * @param state + * The message states to wait for. This should come from getMessageStates * @throws InterruptedException * Thrown if the thread is interrupted * @since 5.1 */ - public void waitForChannelState(DistributedMember member, Map channelState) + public void waitForMessageState(DistributedMember member, Map state) throws InterruptedException; /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java index 9def731..5bb6c4b 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java @@ -16,6 +16,7 @@ */ package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces; +import java.util.Map; import java.util.Set; import com.gemstone.gemfire.distributed.internal.DistributionMessage; @@ -56,4 +57,25 @@ public interface Messenger extends Service { * @return true multicast is enabled and working */ boolean testMulticast(long timeout) throws InterruptedException; + + /** + * For the state-flush algorithm we need to be able to record + * the state of outgoing messages to the given member. If multicast + * is being used for region operations we also need to record its + * state. + * + * @param member the target member + * @param state messaging state is stored in this map + * @param includeMulticast whether to record multicast state + */ + void getMessageState(InternalDistributedMember member, Map state, boolean includeMulticast); + + /** + * The flip-side of getMessageState, this method takes the state it recorded + * and waits for messages from the given member to be received. + * + * @param member the member flushing operations to this member + * @param state the state of that member's outgoing messaging to this member + */ + void waitForMessageState(InternalDistributedMember member, Map state) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 3a3486b..abdceb4 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -313,7 +313,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * @param coord * @return true if the attempt succeeded, false if it timed out */ - boolean attemptToJoin() { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP") + boolean attemptToJoin() { SearchState state = searchState; // send a join request to the coordinator and wait for a response @@ -688,8 +689,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { boolean sendView(NetView view, List newMembers, boolean preparing, ViewReplyProcessor rp) { - boolean isNetworkPartition = isNetworkPartition(view, false); - int id = view.getViewId(); InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing); Set recips = new HashSet<>(view.getMembers()); @@ -954,6 +953,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP") boolean findCoordinatorFromView() { ArrayList result; SearchState state = searchState; @@ -1799,7 +1799,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } // use the new view as the initial view - setInitialView(newView, newMembers, initialLeaving, initialRemovals); + synchronized(this) { + setInitialView(newView, newMembers, initialLeaving, initialRemovals); + } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 36a6200..bdf13b5 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -19,20 +19,16 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger; import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings; import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST; import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -44,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.Logger; import org.jgroups.Address; @@ -52,12 +49,12 @@ import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.Message.Flag; import org.jgroups.Message.TransientFlag; -import org.jgroups.Receiver; import org.jgroups.ReceiverAdapter; import org.jgroups.View; import org.jgroups.ViewId; import org.jgroups.conf.ClassConfigurator; import org.jgroups.protocols.UDP; +import org.jgroups.protocols.pbcast.NAKACK2; import org.jgroups.stack.IpAddress; import org.jgroups.util.Digest; import org.jgroups.util.UUID; @@ -97,6 +94,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.tcp.MemberShunnedException; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + public class JGroupsMessenger implements Messenger { private static final Logger logger = Services.getLogger(); @@ -109,7 +108,7 @@ public class JGroupsMessenger implements Messenger { /** * The location (in the product) of the mcast Jgroups config file. */ - private static final String DEFAULT_JGROUPS_MCAST_CONFIG = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml"; + private static final String JGROUPS_MCAST_CONFIG_FILE_NAME = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml"; /** JG magic numbers for types added to the JG ClassConfigurator */ public static final short JGROUPS_TYPE_JGADDRESS = 2000; @@ -127,13 +126,11 @@ public class JGroupsMessenger implements Messenger { /** handlers that receive certain classes of messages instead of the Manager */ Map handlers = new ConcurrentHashMap(); - Object nakackDigest; - private volatile NetView view; private GMSPingPonger pingPonger = new GMSPingPonger(); - protected volatile long pongsReceived; + protected AtomicLong pongsReceived = new AtomicLong(0); /** * A set that contains addresses that we have logged JGroups IOExceptions for in the @@ -151,6 +148,7 @@ public class JGroupsMessenger implements Messenger { } @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") public void init(Services s) { this.services = s; @@ -170,7 +168,7 @@ public class JGroupsMessenger implements Messenger { String r = null; if (transport.isMcastEnabled()) { - r = DEFAULT_JGROUPS_MCAST_CONFIG; + r = JGROUPS_MCAST_CONFIG_FILE_NAME; } else { r = DEFAULT_JGROUPS_TCP_CONFIG; } @@ -248,6 +246,7 @@ public class JGroupsMessenger implements Messenger { } @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") public void start() { // create the configuration XML string for JGroups String properties = this.jgStackConfig; @@ -303,8 +302,8 @@ public class JGroupsMessenger implements Messenger { throw new SystemConnectException("unable to create jgroups channel", e); } - if (THROW_EXCEPTION_ON_START_HOOK) { - THROW_EXCEPTION_ON_START_HOOK = false; + if (JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK) { + JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK = false; throw new SystemConnectException("failing for test"); } @@ -490,7 +489,7 @@ public class JGroupsMessenger implements Messenger { @Override public boolean testMulticast(long timeout) throws InterruptedException { - long pongsSnapshot = pongsReceived; + long pongsSnapshot = pongsReceived.longValue(); JGAddress dest = null; try { pingPonger.sendPingMessage(myChannel, jgAddress, dest); @@ -500,10 +499,48 @@ public class JGroupsMessenger implements Messenger { return false; } long giveupTime = System.currentTimeMillis() + timeout; - while (pongsReceived == pongsSnapshot && System.currentTimeMillis() < giveupTime) { + while (pongsReceived.longValue() == pongsSnapshot && System.currentTimeMillis() < giveupTime) { Thread.sleep(100); } - return pongsReceived > pongsSnapshot; + return pongsReceived.longValue() > pongsSnapshot; + } + + @Override + public void getMessageState(InternalDistributedMember target, Map state, boolean includeMulticast) { + if (includeMulticast) { + NAKACK2 nakack = (NAKACK2)myChannel.getProtocolStack().findProtocol("NAKACK2"); + if (nakack != null) { + long seqno = nakack.getCurrentSeqno(); + state.put("JGroups.mcastState", Long.valueOf(seqno)); + } + } + } + + @Override + public void waitForMessageState(InternalDistributedMember sender, Map state) throws InterruptedException { + NAKACK2 nakack = (NAKACK2)myChannel.getProtocolStack().findProtocol("NAKACK2"); + Long seqno = (Long)state.get("JGroups.mcastState"); + if (nakack != null && seqno != null) { + waitForMessageState(nakack, sender, seqno); + } + } + + /** + * wait for the mcast state from the given member to reach the given seqno + */ + protected void waitForMessageState(NAKACK2 nakack, InternalDistributedMember sender, Long seqno) + throws InterruptedException { + JGAddress jgSender = new JGAddress(sender); + Digest digest = nakack.getDigest(jgSender); + if (digest != null) { + for (;;) { + long[] senderSeqnos = digest.get(jgSender); + if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) { + break; + } + Thread.sleep(50); + } + } } @Override @@ -985,7 +1022,7 @@ public class JGroupsMessenger implements Messenger { } return; } else if (pingPonger.isPongMessage(contents)) { - pongsReceived++; + pongsReceived.incrementAndGet(); return; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index e5835c4..0b7a544 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -39,7 +39,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; -import com.gemstone.gemfire.CancelCriterion; import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.ForcedDisconnectException; import com.gemstone.gemfire.GemFireConfigException; @@ -2366,17 +2365,18 @@ public class GMSMembershipManager implements MembershipManager, Manager /* non-thread-owned serial channels and high priority channels are not * included */ - public Map getChannelStates(DistributedMember member, boolean includeMulticast) { - HashMap result = new HashMap(); + public Map getMessageState(DistributedMember member, boolean includeMulticast) { + Map result = new HashMap(); Stub stub = (Stub)memberToStubMap.get(member); DirectChannel dc = directChannel; if (stub != null && dc != null) { dc.getChannelStates(stub, result); } + services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast); return result; } - public void waitForChannelState(DistributedMember otherMember, Map channelState) + public void waitForMessageState(DistributedMember otherMember, Map state) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); @@ -2389,15 +2389,9 @@ public class GMSMembershipManager implements MembershipManager, Manager latestViewLock.writeLock().unlock(); } if (dc != null && stub != null) { - dc.waitForChannelState(stub, channelState); - } -// Long mcastState = (Long)channelState.remove("JGroups.MCast"); -// if (mcastState != null) { -// InternalDistributedMember idm = (InternalDistributedMember)otherMember; -// GMSMember jgm = (GMSMember)idm.getNetMember(); -// Address other = jgm.getAddress(); -// gms.waitForMulticastState(other, mcastState.longValue()); -// } + dc.waitForChannelState(stub, state); + } + services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state); } /* http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java index 8d3ea60..e56c126 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java @@ -131,7 +131,7 @@ public class StateFlushOperation { gr.setRecipient(target); ReplyProcessor21 processor = new ReplyProcessor21(dm, target); gr.processorId = processor.getProcessorId(); - gr.channelState = dm.getMembershipManager().getChannelStates(target, false); + gr.channelState = dm.getMembershipManager().getMessageState(target, false); if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP) && ((gr.channelState != null) && (gr.channelState.size() > 0)) ) { logger.trace(LogMarker.STATE_FLUSH_OP, "channel states: {}", gr.channelStateDescription(gr.channelState)); } @@ -410,7 +410,7 @@ public class StateFlushOperation { boolean useMulticast = r.getMulticastEnabled() && r.getSystem().getConfig().getMcastPort() != 0; if (initialized) { - Map channelStates = dm.getMembershipManager().getChannelStates(relayRecipient, useMulticast); + Map channelStates = dm.getMembershipManager().getMessageState(relayRecipient, useMulticast); if (gr.channelState != null) { gr.channelState.putAll(channelStates); } else { @@ -565,7 +565,7 @@ public class StateFlushOperation { dm.getCancelCriterion().checkCancelInProgress(null); boolean interrupted = Thread.interrupted(); try { - dm.getMembershipManager().waitForChannelState(getSender(), channelState); + dm.getMembershipManager().waitForMessageState(getSender(), channelState); break; } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java index c866797..bac356c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java @@ -1080,7 +1080,7 @@ public class ConnectionTable { * @since 5.1 */ protected void getThreadOwnedOrderedConnectionState(Stub member, - HashMap result) { + Map result) { ConcurrentMap cm = this.threadConnectionMap; if (cm != null) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java index 12a03fd..a2801c1 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java @@ -808,7 +808,7 @@ public class TCPConduit implements Runnable { */ public void getThreadOwnedOrderedConnectionState( Stub member, - HashMap result) + Map result) { getConTable().getThreadOwnedOrderedConnectionState(member, result); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java index 4b9c01f..c844583 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -26,15 +26,20 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.commons.lang.SerializationException; +import org.jgroups.Address; import org.jgroups.Event; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.conf.ClassConfigurator; import org.jgroups.protocols.UNICAST3; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.util.Digest; import org.jgroups.util.UUID; import org.junit.After; import org.junit.Test; @@ -682,9 +687,9 @@ public class JGroupsMessengerJUnitTest { interceptor.collectedMessages.clear(); JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver(); - long pongsReceived = messenger.pongsReceived; + long pongsReceived = messenger.pongsReceived.longValue(); receiver.receive(pongMessage); - assertEquals(pongsReceived+1, messenger.pongsReceived); + assertEquals(pongsReceived+1, messenger.pongsReceived.longValue()); receiver.receive(pingMessage); assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1); Message m = interceptor.collectedMessages.get(0); @@ -756,6 +761,61 @@ public class JGroupsMessengerJUnitTest { assertTrue(newMessenger.myChannel == messenger.myChannel); } + @Test + public void testGetMessageState() throws Exception { + initMocks(true/*multicast*/); + messenger.testMulticast(50); // do some multicast messaging + NAKACK2 nakack = (NAKACK2)messenger.myChannel.getProtocolStack().findProtocol("NAKACK2"); + assertNotNull(nakack); + long seqno = nakack.getCurrentSeqno(); + Map state = new HashMap(); + messenger.getMessageState(null, state, true); + assertEquals(1, state.size()); + Long stateLong = (Long)state.values().iterator().next(); + assertTrue("expected multicast state to be at least "+seqno+" but it was "+stateLong.longValue(), + stateLong.longValue() >= seqno); + } + + @Test + public void testGetMessageStateNoMulticast() throws Exception { + initMocks(false/*multicast*/); + Map state = new HashMap(); + messenger.getMessageState(null, state, true); + assertEquals("expected an empty map but received " + state, 0, state.size()); + } + + @Test + public void testWaitForMessageState() throws Exception { + initMocks(true/*multicast*/); + NAKACK2 nakack = mock(NAKACK2.class); + Digest digest = mock(Digest.class); + when(nakack.getDigest(any(Address.class))).thenReturn(digest); + when(digest.get(any(Address.class))).thenReturn( + new long[] {0,0}, new long[] {2, 50}, new long[] {49, 50}, new long[] {50, 80}, new long[] {80, 120}); + messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50)); + verify(digest, times(4)).get(isA(Address.class)); + + reset(digest); + when(digest.get(any(Address.class))).thenReturn( + new long[] {0,0}, new long[] {2, 50}, null); + messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50)); + verify(digest, times(3)).get(isA(Address.class)); + + // for code coverage let's invoke the other waitForMessageState method + Map state = new HashMap(); + state.put("JGroups.mcastState", Long.valueOf(10L)); + messenger.waitForMessageState(createAddress(1234), state); + } + + + @Test + public void testMulticastTest() throws Exception { + initMocks(true); + boolean result = messenger.testMulticast(50); + // this shouldln't succeed + assertFalse(result); + } + /** * creates an InternalDistributedMember address that can be used * with the doctored JGroups channel. This includes a logical