geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject incubator-geode git commit: StateFlushOperation flushing/waiting for multicast messages
Date Fri, 11 Dec 2015 23:09:23 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop eb685b45c -> 2f0c7fcb5


StateFlushOperation flushing/waiting for multicast messages

As part of the State Flush algorithm we record the state of the multicast
protocol in JGroups and send it to the initial image provider.  There the
information is used to wait for the on-wire multicast messages to be
received.

This change set also includes additional JGroupsMessenger tests for better code
coverage and fixes a few Find Bugs problems in that class and GMSJoinLeave.
One of these required replacing a volatile long with an AtomicLong because
the long was being incremented, which is not necessarily an atomic operation
on a volatile variable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2f0c7fcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2f0c7fcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2f0c7fcb

Branch: refs/heads/develop
Commit: 2f0c7fcb5bd46ecc37af9341f43e8aa3521048e3
Parents: eb685b4
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Fri Dec 11 15:06:46 2015 -0800
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Fri Dec 11 15:09:08 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectChannel.java          |  2 +-
 .../internal/membership/MembershipManager.java  | 18 ++---
 .../membership/gms/interfaces/Messenger.java    | 22 +++++++
 .../membership/gms/membership/GMSJoinLeave.java | 10 +--
 .../gms/messenger/JGroupsMessenger.java         | 69 +++++++++++++++-----
 .../gms/mgr/GMSMembershipManager.java           | 20 ++----
 .../internal/cache/StateFlushOperation.java     |  6 +-
 .../gemfire/internal/tcp/ConnectionTable.java   |  2 +-
 .../gemfire/internal/tcp/TCPConduit.java        |  2 +-
 .../messenger/JGroupsMessengerJUnitTest.java    | 64 +++++++++++++++++-
 10 files changed, 165 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
index f84813e..14ff923 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
@@ -962,7 +962,7 @@ public class DirectChannel {
    *    the map to add the state to
    * @since 5.1
    */
-  public void getChannelStates(Stub member, HashMap result)
+  public void getChannelStates(Stub member, Map result)
   {
     TCPConduit tc = this.conduit;
     if (tc != null) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
index 54b82a7..a46680b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
@@ -177,30 +177,30 @@ public interface MembershipManager {
   public boolean shutdownInProgress();
 
   /**
-   * Returns a serializable map of communication channel state for
+   * Returns a serializable map of communications state for
    * use in state stabilization.
    * @param member
-   *    the member whose channel state is to be captured
+   *    the member whose message state is to be captured
    * @param includeMulticast
-   *    whether the state of the mcast channel should be included
+   *    whether the state of the mcast messaging should be included
    * @return the current state of the communication channels between this
    *    process and the given distributed member
    * @since 5.1
    */
-  public Map getChannelStates(DistributedMember member, boolean includeMulticast);
+  public Map getMessageState(DistributedMember member, boolean includeMulticast);
 
   /**
-   * Waits for the given communication channels to reach the associated
+   * Waits for the given communications to reach the associated
    * state
    * @param member
-   *    The member whose channel state we're waiting for
-   * @param channelState
-   *    The channel states to wait for.  This should come from getChannelStates
+   *    The member whose messaging state we're waiting for
+   * @param state
+   *    The message states to wait for.  This should come from getMessageStates
    * @throws InterruptedException
    *    Thrown if the thread is interrupted
    * @since 5.1
    */
-  public void waitForChannelState(DistributedMember member, Map channelState)
+  public void waitForMessageState(DistributedMember member, Map state)
     throws InterruptedException;
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index 9def731..5bb6c4b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -16,6 +16,7 @@
  */
 package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;
 
+import java.util.Map;
 import java.util.Set;
 
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
@@ -56,4 +57,25 @@ public interface Messenger extends Service {
    * @return true multicast is enabled and working
    */
   boolean testMulticast(long timeout) throws InterruptedException;
+
+  /**
+   * For the state-flush algorithm we need to be able to record
+   * the state of outgoing messages to the given member.  If multicast
+   * is being used for region operations we also need to record its
+   * state.
+   * 
+   * @param member the target member
+   * @param state messaging state is stored in this map
+   * @param includeMulticast whether to record multicast state
+   */
+  void getMessageState(InternalDistributedMember member, Map state, boolean includeMulticast);
+  
+  /**
+   * The flip-side of getMessageState, this method takes the state it recorded
+   * and waits for messages from the given member to be received.
+   * 
+   * @param member the member flushing operations to this member
+   * @param state the state of that member's outgoing messaging to this member
+   */
+  void waitForMessageState(InternalDistributedMember member, Map state) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 3a3486b..abdceb4 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -313,7 +313,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
    * @param coord
    * @return true if the attempt succeeded, false if it timed out
    */
-   boolean attemptToJoin() {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
+  boolean attemptToJoin() {
     SearchState state = searchState;
 
     // send a join request to the coordinator and wait for a response
@@ -688,8 +689,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean
preparing, ViewReplyProcessor rp) {
 
-    boolean isNetworkPartition = isNetworkPartition(view, false);
-    
     int id = view.getViewId();
     InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress),
preparing);
     Set<InternalDistributedMember> recips = new HashSet<>(view.getMembers());
@@ -954,6 +953,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }    
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP")
   boolean findCoordinatorFromView() {
     ArrayList<FindCoordinatorResponse> result;
     SearchState state = searchState;
@@ -1799,7 +1799,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         }
 
         // use the new view as the initial view
-        setInitialView(newView, newMembers, initialLeaving, initialRemovals);
+        synchronized(this) {
+          setInitialView(newView, newMembers, initialLeaving, initialRemovals);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 36a6200..bdf13b5 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -19,20 +19,16 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.logging.log4j.Logger;
 import org.jgroups.Address;
@@ -52,12 +49,12 @@ import org.jgroups.JChannel;
 import org.jgroups.Message;
 import org.jgroups.Message.Flag;
 import org.jgroups.Message.TransientFlag;
-import org.jgroups.Receiver;
 import org.jgroups.ReceiverAdapter;
 import org.jgroups.View;
 import org.jgroups.ViewId;
 import org.jgroups.conf.ClassConfigurator;
 import org.jgroups.protocols.UDP;
+import org.jgroups.protocols.pbcast.NAKACK2;
 import org.jgroups.stack.IpAddress;
 import org.jgroups.util.Digest;
 import org.jgroups.util.UUID;
@@ -97,6 +94,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
 public class JGroupsMessenger implements Messenger {
 
   private static final Logger logger = Services.getLogger();
@@ -109,7 +108,7 @@ public class JGroupsMessenger implements Messenger {
   /**
    * The location (in the product) of the mcast Jgroups config file.
    */
-  private static final String DEFAULT_JGROUPS_MCAST_CONFIG = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml";
+  private static final String JGROUPS_MCAST_CONFIG_FILE_NAME = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml";
 
   /** JG magic numbers for types added to the JG ClassConfigurator */
   public static final short JGROUPS_TYPE_JGADDRESS = 2000;
@@ -127,13 +126,11 @@ public class JGroupsMessenger implements Messenger {
   /** handlers that receive certain classes of messages instead of the Manager */
   Map<Class, MessageHandler> handlers = new ConcurrentHashMap<Class, MessageHandler>();
   
-  Object nakackDigest;
-
   private volatile NetView view;
 
   private GMSPingPonger pingPonger = new GMSPingPonger();
   
-  protected volatile long pongsReceived;
+  protected AtomicLong pongsReceived = new AtomicLong(0);
   
   /**
    * A set that contains addresses that we have logged JGroups IOExceptions for in the
@@ -151,6 +148,7 @@ public class JGroupsMessenger implements Messenger {
   }
 
   @Override
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   public void init(Services s) {
     this.services = s;
 
@@ -170,7 +168,7 @@ public class JGroupsMessenger implements Messenger {
 
     String r = null;
     if (transport.isMcastEnabled()) {
-      r = DEFAULT_JGROUPS_MCAST_CONFIG;
+      r = JGROUPS_MCAST_CONFIG_FILE_NAME;
     } else {
       r = DEFAULT_JGROUPS_TCP_CONFIG;
     }
@@ -248,6 +246,7 @@ public class JGroupsMessenger implements Messenger {
   }
 
   @Override
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   public void start() {
     // create the configuration XML string for JGroups
     String properties = this.jgStackConfig;
@@ -303,8 +302,8 @@ public class JGroupsMessenger implements Messenger {
       throw new SystemConnectException("unable to create jgroups channel", e);
     }
     
-    if (THROW_EXCEPTION_ON_START_HOOK) {
-      THROW_EXCEPTION_ON_START_HOOK = false;
+    if (JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK) {
+      JGroupsMessenger.THROW_EXCEPTION_ON_START_HOOK = false;
       throw new SystemConnectException("failing for test");
     }
     
@@ -490,7 +489,7 @@ public class JGroupsMessenger implements Messenger {
   
   @Override
   public boolean testMulticast(long timeout) throws InterruptedException {
-    long pongsSnapshot = pongsReceived;
+    long pongsSnapshot = pongsReceived.longValue();
     JGAddress dest = null;
     try {
       pingPonger.sendPingMessage(myChannel, jgAddress, dest);
@@ -500,10 +499,48 @@ public class JGroupsMessenger implements Messenger {
       return false;
     }
     long giveupTime = System.currentTimeMillis() + timeout;
-    while (pongsReceived == pongsSnapshot && System.currentTimeMillis() < giveupTime)
{
+    while (pongsReceived.longValue() == pongsSnapshot && System.currentTimeMillis()
< giveupTime) {
       Thread.sleep(100);
     }
-    return pongsReceived > pongsSnapshot;
+    return pongsReceived.longValue() > pongsSnapshot;
+  }
+  
+  @Override
+  public void getMessageState(InternalDistributedMember target, Map state, boolean includeMulticast)
{
+    if (includeMulticast) {
+      NAKACK2 nakack = (NAKACK2)myChannel.getProtocolStack().findProtocol("NAKACK2");
+      if (nakack != null) {
+        long seqno = nakack.getCurrentSeqno();
+        state.put("JGroups.mcastState", Long.valueOf(seqno));
+      }
+    }
+  }
+  
+  @Override
+  public void waitForMessageState(InternalDistributedMember sender, Map state) throws InterruptedException
{
+    NAKACK2 nakack = (NAKACK2)myChannel.getProtocolStack().findProtocol("NAKACK2");
+    Long seqno = (Long)state.get("JGroups.mcastState");
+    if (nakack != null && seqno != null) {
+      waitForMessageState(nakack, sender, seqno);
+    }
+  }
+  
+  /**
+   * wait for the mcast state from the given member to reach the given seqno 
+   */
+  protected void waitForMessageState(NAKACK2 nakack, InternalDistributedMember sender, Long
seqno)
+    throws InterruptedException {
+    JGAddress jgSender = new JGAddress(sender);
+    Digest digest = nakack.getDigest(jgSender);
+    if (digest != null) {
+      for (;;) {
+        long[] senderSeqnos = digest.get(jgSender);
+        if (senderSeqnos == null || senderSeqnos[0] >= seqno.longValue()) {
+          break;
+        }
+        Thread.sleep(50);
+      }
+    }
   }
 
   @Override
@@ -985,7 +1022,7 @@ public class JGroupsMessenger implements Messenger {
         }
         return;
       } else if (pingPonger.isPongMessage(contents)) {
-        pongsReceived++;
+        pongsReceived.incrementAndGet();
         return;
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index e5835c4..0b7a544 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -39,7 +39,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
 
-import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.ForcedDisconnectException;
 import com.gemstone.gemfire.GemFireConfigException;
@@ -2366,17 +2365,18 @@ public class GMSMembershipManager implements MembershipManager, Manager
   /* non-thread-owned serial channels and high priority channels are not
    * included
    */
-  public Map getChannelStates(DistributedMember member, boolean includeMulticast) {
-    HashMap result = new HashMap();
+  public Map getMessageState(DistributedMember member, boolean includeMulticast) {
+    Map result = new HashMap();
     Stub stub = (Stub)memberToStubMap.get(member);
     DirectChannel dc = directChannel;
     if (stub != null && dc != null) {
       dc.getChannelStates(stub, result);
     }
+    services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast);
     return result;
   }
 
-  public void waitForChannelState(DistributedMember otherMember, Map channelState)
+  public void waitForMessageState(DistributedMember otherMember, Map state)
     throws InterruptedException
   {
     if (Thread.interrupted()) throw new InterruptedException();
@@ -2389,15 +2389,9 @@ public class GMSMembershipManager implements MembershipManager, Manager
       latestViewLock.writeLock().unlock();
     }
     if (dc != null && stub != null) {
-      dc.waitForChannelState(stub, channelState);
-    }
-//    Long mcastState = (Long)channelState.remove("JGroups.MCast");
-//    if (mcastState != null) {
-//      InternalDistributedMember idm = (InternalDistributedMember)otherMember;
-//      GMSMember jgm = (GMSMember)idm.getNetMember();
-//      Address other = jgm.getAddress();
-//      gms.waitForMulticastState(other, mcastState.longValue());
-//    }
+      dc.waitForChannelState(stub, state);
+    }
+    services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state);
   }
   
   /* 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
index 8d3ea60..e56c126 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
@@ -131,7 +131,7 @@ public class StateFlushOperation  {
       gr.setRecipient(target);
       ReplyProcessor21 processor = new ReplyProcessor21(dm, target);
       gr.processorId = processor.getProcessorId();
-      gr.channelState = dm.getMembershipManager().getChannelStates(target, false);
+      gr.channelState = dm.getMembershipManager().getMessageState(target, false);
       if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP) && ((gr.channelState !=
null) && (gr.channelState.size() > 0)) ) {
         logger.trace(LogMarker.STATE_FLUSH_OP, "channel states: {}", gr.channelStateDescription(gr.channelState));
       }
@@ -410,7 +410,7 @@ public class StateFlushOperation  {
               boolean useMulticast = r.getMulticastEnabled()
                                     && r.getSystem().getConfig().getMcastPort() !=
0;
               if (initialized) {
-                Map channelStates = dm.getMembershipManager().getChannelStates(relayRecipient,
useMulticast);
+                Map channelStates = dm.getMembershipManager().getMessageState(relayRecipient,
useMulticast);
                 if (gr.channelState != null) {
                   gr.channelState.putAll(channelStates);
                 } else {
@@ -565,7 +565,7 @@ public class StateFlushOperation  {
                 dm.getCancelCriterion().checkCancelInProgress(null);
                 boolean interrupted = Thread.interrupted();
                 try {
-                  dm.getMembershipManager().waitForChannelState(getSender(), channelState);
+                  dm.getMembershipManager().waitForMessageState(getSender(), channelState);
                   break;
                 }
                 catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index c866797..bac356c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -1080,7 +1080,7 @@ public class ConnectionTable  {
    * @since 5.1
    */
   protected void getThreadOwnedOrderedConnectionState(Stub member,
-      HashMap result) {
+      Map result) {
 
     ConcurrentMap cm = this.threadConnectionMap;
     if (cm != null) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
index 12a03fd..a2801c1 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
@@ -808,7 +808,7 @@ public class TCPConduit implements Runnable {
    */
   public void getThreadOwnedOrderedConnectionState(
     Stub member,
-    HashMap result)
+    Map result)
   {
     getConTable().getThreadOwnedOrderedConnectionState(member, result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2f0c7fcb/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 4b9c01f..c844583 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -26,15 +26,20 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang.SerializationException;
+import org.jgroups.Address;
 import org.jgroups.Event;
 import org.jgroups.JChannel;
 import org.jgroups.Message;
 import org.jgroups.conf.ClassConfigurator;
 import org.jgroups.protocols.UNICAST3;
+import org.jgroups.protocols.pbcast.NAKACK2;
+import org.jgroups.util.Digest;
 import org.jgroups.util.UUID;
 import org.junit.After;
 import org.junit.Test;
@@ -682,9 +687,9 @@ public class JGroupsMessengerJUnitTest {
 
     interceptor.collectedMessages.clear();
     JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver();
-    long pongsReceived = messenger.pongsReceived;
+    long pongsReceived = messenger.pongsReceived.longValue();
     receiver.receive(pongMessage);
-    assertEquals(pongsReceived+1, messenger.pongsReceived);
+    assertEquals(pongsReceived+1, messenger.pongsReceived.longValue());
     receiver.receive(pingMessage);
     assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(),
1);
     Message m = interceptor.collectedMessages.get(0);
@@ -756,6 +761,61 @@ public class JGroupsMessengerJUnitTest {
     assertTrue(newMessenger.myChannel == messenger.myChannel);
   }
   
+  @Test
+  public void testGetMessageState() throws Exception {
+    initMocks(true/*multicast*/);
+    messenger.testMulticast(50); // do some multicast messaging
+    NAKACK2 nakack = (NAKACK2)messenger.myChannel.getProtocolStack().findProtocol("NAKACK2");
+    assertNotNull(nakack);
+    long seqno = nakack.getCurrentSeqno();
+    Map state = new HashMap();
+    messenger.getMessageState(null, state, true);
+    assertEquals(1, state.size());
+    Long stateLong = (Long)state.values().iterator().next();
+    assertTrue("expected multicast state to be at least "+seqno+" but it was "+stateLong.longValue(),
+        stateLong.longValue() >= seqno);
+  }
+  
+  @Test
+  public void testGetMessageStateNoMulticast() throws Exception {
+    initMocks(false/*multicast*/);
+    Map state = new HashMap();
+    messenger.getMessageState(null, state, true);
+    assertEquals("expected an empty map but received " + state, 0, state.size());
+  }
+  
+  @Test
+  public void testWaitForMessageState() throws Exception {
+    initMocks(true/*multicast*/);
+    NAKACK2 nakack = mock(NAKACK2.class);
+    Digest digest = mock(Digest.class);
+    when(nakack.getDigest(any(Address.class))).thenReturn(digest);
+    when(digest.get(any(Address.class))).thenReturn(
+        new long[] {0,0}, new long[] {2, 50}, new long[] {49, 50}, new long[] {50, 80}, new
long[] {80, 120});
+    messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
+    verify(digest, times(4)).get(isA(Address.class));
+    
+    reset(digest);
+    when(digest.get(any(Address.class))).thenReturn(
+        new long[] {0,0}, new long[] {2, 50}, null);
+    messenger.waitForMessageState(nakack, createAddress(1234), Long.valueOf(50));
+    verify(digest, times(3)).get(isA(Address.class));
+    
+    // for code coverage let's invoke the other waitForMessageState method
+    Map state = new HashMap();
+    state.put("JGroups.mcastState", Long.valueOf(10L));
+    messenger.waitForMessageState(createAddress(1234), state);
+  }
+  
+
+  @Test
+  public void testMulticastTest() throws Exception {
+    initMocks(true);
+    boolean result = messenger.testMulticast(50);
+    // this shouldln't succeed
+    assertFalse(result);
+  }
+  
   /**
    * creates an InternalDistributedMember address that can be used
    * with the doctored JGroups channel.  This includes a logical


Mime
View raw message