geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject incubator-geode git commit: install the messenger's JGAddress into the JGroups stack so it will be transmitted with messages. Add conversion from JGAddress to GMSMember and InternalDistributedMember for setting the sender of a message. This shortens th
Date Tue, 28 Jul 2015 19:37:47 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 17a1f4dd7 -> 2af40c665


install the messenger's JGAddress into the JGroups stack so it will be transmitted with messages.
 Add conversion from JGAddress to GMSMember and InternalDistributedMember for setting the
sender of a message.  This shortens the length of JGroups messages by removing the GMSMember
that was being serialized at the beginning of every message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2af40c66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2af40c66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2af40c66

Branch: refs/heads/feature/GEODE-77
Commit: 2af40c6653858aa3048cb95187adeacdac13b0eb
Parents: 17a1f4d
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Tue Jul 28 12:37:16 2015 -0700
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Tue Jul 28 12:37:16 2015 -0700

----------------------------------------------------------------------
 .../internal/membership/MembershipManager.java  |   5 +-
 .../internal/membership/gms/Services.java       |  21 ++-
 .../membership/gms/interfaces/Manager.java      |   6 +-
 .../gms/messenger/JGroupsMessenger.java         | 189 +++++++++++--------
 .../gms/mgr/GMSMembershipManager.java           |  53 ++----
 5 files changed, 150 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2af40c66/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
index 8b861b1..cfca7e8 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
@@ -8,15 +8,14 @@
 package com.gemstone.gemfire.distributed.internal.membership;
 
 import java.io.NotSerializableException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
-import com.gemstone.gemfire.distributed.internal.DistributionStats;
 import com.gemstone.gemfire.internal.logging.InternalLogWriter;
 import com.gemstone.gemfire.internal.tcp.Stub;
 
@@ -137,7 +136,7 @@ public interface MembershipManager {
   public Set send(
       InternalDistributedMember[] destinations,
       DistributionMessage content,
-      DistributionStats stats)
+      DMStats stats)
   throws NotSerializableException;
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2af40c66/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 7b5aec7..9d9c720 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -1,5 +1,7 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms;
 
+import java.util.Timer;
+
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.CancelCriterion;
@@ -46,15 +48,31 @@ public class Services {
 
   private InternalLogWriter securityLogWriter;
   
+  private Timer timer = new Timer("Membership Timer", true);
+  
   
+
+  /**
+   * A common logger for membership classes
+   */
   public static Logger getLogger() {
     return logger;
   }
-  
+
+  /**
+   * The thread group for all membership threads
+   */
   public static ThreadGroup getThreadGroup() {
     return threadGroup;
   }
   
+  /**
+   * a timer used for membership tasks
+   */
+  public Timer getTimer() {
+    return this.timer;
+  }
+  
 
 
   protected Services(
@@ -126,6 +144,7 @@ public class Services {
     this.auth.stop();
     this.messenger.stop();
     this.manager.stop();
+    this.timer.cancel();
   }
   
   public static void setSecurityLogWriter(InternalLogWriter writer) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2af40c66/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
index 00ceeed..315b69f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
@@ -1,6 +1,7 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;
 
 import java.io.NotSerializableException;
+import java.util.Set;
 
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
@@ -23,10 +24,9 @@ public interface Manager extends Service, MessageHandler {
   /**
    * Sends a message using a selected distribution channel
    * (e.g. Messenger or DirectChannel)
+   * @return a set of recipients that did not receive the message
    */
-  void send(DistributionMessage m) throws NotSerializableException;
-
-  InternalDistributedMember getMemberID(NetMember m);
+  Set<InternalDistributedMember> send(DistributionMessage m) throws NotSerializableException;
 
   void forceDisconnect(String reason);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2af40c66/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 7b0a5f9..ec8a0fa 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -237,7 +237,8 @@ public class JGroupsMessenger implements Messenger {
       throw new SystemConnectException("unable to create jgroups channel", e);
     }
     
-    findLocalAddress();
+    establishLocalAddress();
+    
     DistributionConfig config = services.getConfig().getDistributionConfig();
     boolean isLocator = (MemberAttributes.DEFAULT.getVmKind() == DistributionManager.LOCATOR_DM_TYPE);

     
@@ -289,7 +290,7 @@ public class JGroupsMessenger implements Messenger {
   
 
   
-  private void findLocalAddress() {
+  private void establishLocalAddress() {
     UUID logicalAddress = (UUID)myChannel.getAddress();
     
     IpAddress ipaddr = (IpAddress)myChannel.down(new Event(Event.GET_PHYSICAL_ADDRESS));
@@ -326,6 +327,7 @@ public class JGroupsMessenger implements Messenger {
         }
       }
     }
+    myChannel.down(new Event(Event.SET_LOCAL_ADDRESS, this.jgAddress));
   }
 
   @Override
@@ -534,7 +536,7 @@ public class JGroupsMessenger implements Messenger {
     }
     Message msg = new Message();
     msg.setDest(null);
-//    msg.setSrc(src.asIpAddress());
+    msg.setSrc(src);
     //log.info("Creating message with payload " + gfmsg);
     if (gfmsg instanceof com.gemstone.gemfire.internal.cache.DistributedCacheOperation.CacheOperationMessage)
{
       com.gemstone.gemfire.internal.cache.DistributedCacheOperation.CacheOperationMessage
cmsg =
@@ -551,8 +553,7 @@ public class JGroupsMessenger implements Messenger {
       HeapDataOutputStream out_stream =
         new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
         Version.CURRENT.writeOrdinal(out_stream, true);
-        // TODO: preserialize this when the address is established
-        DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
+//        DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
         DataSerializer.writeObject(gfmsg, out_stream);
         msg.setBuffer(out_stream.toByteArray());
     }
@@ -566,11 +567,72 @@ public class JGroupsMessenger implements Messenger {
     return msg;
   }
 
+
+  /**
+   * deserialize a jgroups payload.  If it's a DistributionMessage find
+   * the ID of the sender and establish it as the message's sender
+   */
+  Object readJGMessage(Message jgmsg) {
+    Object result = null;
+    
+    int messageLength = jgmsg.getLength();
+    
+    if (messageLength == 0) {
+      // jgroups messages with no payload are used for protocol interchange, such
+      // as STABLE_GOSSIP
+      logger.debug("Message length is zero - ignoring");
+      return null;
+    }
+
+    InternalDistributedMember sender = null;
+
+    Exception problem = null;
+    try {
+      byte[] buf = jgmsg.getRawBuffer();
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, 
+          jgmsg.getOffset(), jgmsg.getLength()));
+
+      short ordinal = Version.readOrdinal(dis);
+      
+      if (ordinal < Version.CURRENT_ORDINAL) {
+        dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
+            ordinal, true));
+      }
+      
+      Address s = jgmsg.getSrc();
+      sender = getMemberFromView(s, ordinal);
+
+      result = DataSerializer.readObject(dis);
+      if (result instanceof DistributionMessage) {
+        ((DistributionMessage) result).setSender(sender);
+      }
+
+      logger.debug("JGroupsReceiver deserialized {}", result);
+
+    }
+    catch (ClassNotFoundException e) {
+      problem = e;
+    }
+    catch (IOException e) {
+      problem = e;
+    }
+    catch (RuntimeException e) {
+      problem = e;
+    }
+    if (problem != null) {
+      logger.error(LocalizedMessage.create(
+            LocalizedStrings.GroupMembershipService_EXCEPTION_DESERIALIZING_MESSAGE_PAYLOAD_0,
jgmsg), problem);
+      return null;
+    }
+
+    return result;
+  }
+  
   
   /** look for certain messages that may need to be altered before being sent */
   private void filterMessage(DistributionMessage m) {
     if (m instanceof JoinResponseMessage) {
-      // TODO: does the new JGroups need to have the NAKACK digest transmitted
+      // TODO: for mcast does the new JGroups need to have the NAKACK digest transmitted
       // to new members at join-time?  The old JGroups needs this and it would require us
to
       // install an uphandler for JChannel to handle GET_DIGEST_OK events.
       // I (bruce) am postponing looking into this until we move to the new version of jgroups.
@@ -582,6 +644,45 @@ public class JGroupsMessenger implements Messenger {
     return localAddress;
   }
 
+  /**
+   * returns the member ID for the given GMSMember object
+   */
+  private InternalDistributedMember getMemberFromView(Address jgId, short version) {
+    NetView v = services.getJoinLeave().getView();
+    GMSMember gm = null;
+    
+    if ( !(jgId instanceof JGAddress) ) {
+      // not one of our addresses - gather info from JGroups to form
+      // a GMSAddress or fish for the ID using the UUID
+      IpAddress pa = (IpAddress)myChannel.down(new Event(Event.GET_PHYSICAL_ADDRESS, jgId));
+      if (pa == null) {
+        // worst-case scenario - we only have a UUID
+        for (InternalDistributedMember m: v.getMembers()) {
+          if (((GMSMember)m.getNetMember()).getUUID().equals(jgId)) {
+            return m;
+          }
+        }
+      }
+      gm = new GMSMember(pa.getIpAddress(), pa.getPort(),
+          false/*unknown*/, false/*unknown*/, version);
+    }
+    else {
+      JGAddress addr = (JGAddress)jgId;
+      gm = new GMSMember(addr.getInetAddress(), addr.getPort(),
+          false/*unknown*/, false/*unknown*/, version);
+    }
+    
+    if (v != null) {
+      for (InternalDistributedMember m: v.getMembers()) {
+        if (m.getNetMember().equals(jgId)) {
+          return m;
+        }
+      }
+    }
+    return new InternalDistributedMember(gm);
+  }
+
+
   @Override
   public void emergencyClose() {
     this.view = null;
@@ -622,49 +723,7 @@ public class JGroupsMessenger implements Messenger {
 
       logger.debug("JGroupsReceiver received {} headers: {}", jgmsg, jgmsg.getHeaders());
       
-      Object o = null;
-      int messageLength = jgmsg.getLength();
-      
-      if (messageLength == 0) {
-        // jgroups messages with no payload are used for protocol interchange, such
-        // as STABLE_GOSSIP
-        logger.debug("Message length is zero - ignoring");
-        return;
-      }
-
-      GMSMember sender = null;
-
-      Exception problem = null;
-      try {
-        byte[] buf = jgmsg.getRawBuffer();
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, 
-            jgmsg.getOffset(), jgmsg.getLength()));
-
-        short ordinal = Version.readOrdinal(dis);
-        
-        if (ordinal < Version.CURRENT_ORDINAL) {
-          dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
-              ordinal, true));
-        }
-        
-        sender = DataSerializer.readObject(dis);
-        o = DataSerializer.readObject(dis);
-      }
-      catch (ClassNotFoundException e) {
-        problem = e;
-      }
-      catch (IOException e) {
-        problem = e;
-      }
-      catch (RuntimeException e) {
-        problem = e;
-      }
-      if (problem != null) {
-        logger.error(LocalizedMessage.create(
-              LocalizedStrings.GroupMembershipService_EXCEPTION_DESERIALIZING_MESSAGE_PAYLOAD_0,
jgmsg), problem);
-        return;
-      }
-      
+      Object o = readJGMessage(jgmsg);
       if (o == null) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.GroupMembershipService_MEMBERSHIP_GEMFIRE_RECEIVED_NULL_MESSAGE_FROM__0,
String.valueOf(jgmsg)));
@@ -672,7 +731,7 @@ public class JGroupsMessenger implements Messenger {
             LocalizedStrings.GroupMembershipService_MEMBERSHIP_MESSAGE_HEADERS__0, jgmsg.printObjectHeaders()));
         return;
       } else if ( !(o instanceof DistributionMessage) ) {
-        logger.warn("Received something other than a message from " + sender + " ("+jgmsg.getSrc()+")
: " + o);
+        logger.warn("Received something other than a message from " + jgmsg.getSrc() + ":
" + o);
         return;
       }
 
@@ -689,19 +748,14 @@ public class JGroupsMessenger implements Messenger {
       }
 
       msg.resetTimestamp();
-      msg.setBytesRead(messageLength);
+      msg.setBytesRead(jgmsg.getLength());
             
-      if (sender == null) {
+      if (msg.getSender() == null) {
         Exception e = new Exception(LocalizedStrings.GroupMembershipService_NULL_SENDER.toLocalizedString());
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.GroupMembershipService_MEMBERSHIP_GEMFIRE_RECEIVED_A_MESSAGE_WITH_NO_SENDER_ADDRESS),
e);
       }
       
-      InternalDistributedMember dm = getMemberFromView(sender);
-      msg.setSender(dm);
-
-      logger.debug("JGroupsReceiver deserialized {}", msg);
-
       try {
         MessageHandler h = getMessageHandler(msg);
         logger.debug("Handler for this message is {}", h);
@@ -737,27 +791,6 @@ public class JGroupsMessenger implements Messenger {
     }
     
     
-    /**
-     * returns the member ID for the given GMSMember object
-     */
-    private InternalDistributedMember getMemberFromView(GMSMember mbr) {
-      NetView v = services.getJoinLeave().getView();
-      InternalDistributedMember dm = null;
-      if (v != null) {
-        for (InternalDistributedMember m: v.getMembers()) {
-          if (m.getNetMember().equals(mbr)) {
-            dm = m;
-            break;
-          }
-        }
-      }
-      if (dm == null) {
-        dm = new InternalDistributedMember(mbr);
-      }
-      return dm;
-    }
-
-
     @Override
     public void block() {
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2af40c66/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index b15caf9..39685fb 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -21,7 +21,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -109,7 +108,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
    * here so it is available to the UDP protocol for passing off
    * the ping-pong responses used in the quorum-checking algorithm. 
    */
-  private volatile JGroupsQuorumChecker quorumChecker;
+  private volatile QuorumChecker quorumChecker;
   
   /**
    * during an auto-reconnect attempt set this to the old DistributedSystem's
@@ -118,12 +117,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
   private volatile DatagramSocket oldDSUDPSocket;
   
   /**
-   * A general use timer
-   */
-  private Timer timer = new Timer("Membership Timer", true);
-  
-  
-  /**
    * Trick class to make the startup synch more
    * visible in stack traces
    * 
@@ -1720,8 +1713,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
 //    stubToMemberMap.clear();
 //    memberToStubMap.clear();
     
-    this.timer.cancel();
-    
     if (DEBUG) {
       System.err.println("DEBUG: done closing GroupMembershipService");
     }
@@ -1942,9 +1933,9 @@ public class GMSMembershipManager implements MembershipManager, Manager
    * all received it)
    * @throws NotSerializableException if the message is not serializable
    */
-  private Set directChannelSend(InternalDistributedMember[] destinations,
+  private Set<InternalDistributedMember> directChannelSend(InternalDistributedMember[]
destinations,
       DistributionMessage content,
-      com.gemstone.gemfire.distributed.internal.DistributionStats theStats)
+      DMStats theStats)
       throws NotSerializableException
   {
     boolean allDestinations;
@@ -2003,7 +1994,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
             new Object[] {content, member, view}), th);
 //        Assert.assertTrue(false, "messaging contract failure");
       }
-      return new HashSet(members);
+      return new HashSet<InternalDistributedMember>(members);
     } // catch ConnectionExceptions
     catch (ToDataException e) {
       throw e; // error in user data
@@ -2062,7 +2053,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
    * probes.  It is made available here for the UDP protocol to
    * hand off ping-pong responses to the checker.
    */
-  public JGroupsQuorumChecker getQuorumCheckerImpl() {
+  public QuorumChecker getQuorumCheckerImpl() {
     return this.quorumChecker;
   }
   
@@ -2084,10 +2075,10 @@ public class GMSMembershipManager implements MembershipManager, Manager
       return this.quorumChecker;
     }
     try {
-      // TODO: we really need JChannel instead of a datagram socket because jgroup
-      // doesn't have the "ping" handling that I built into the TP protocol.  Maybe we should
just
-      // keep the JGroupsMessenger and use it to send PingMessages.  We'd want to
-      // bypass UNICAST and wipe out all message handlers except for the Pings.
+      // TODO: creation of the quorum checker should be delegated to the
+      // Messenger component.  For JGroups we we really need JChannel instead
+      // of a datagram socket because jgroup
+      // doesn't have the "ping" handling that I built into the TP protocol.s
       DatagramSocket sock = new DatagramSocket(this.address.getPort(),
                                this.address.getNetMember().getInetAddress());
       JGroupsQuorumChecker impl = new JGroupsQuorumChecker(
@@ -2123,7 +2114,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
   
   public Set send(InternalDistributedMember[] destinations,
       DistributionMessage msg,
-      com.gemstone.gemfire.distributed.internal.DistributionStats theStats)
+      DMStats theStats)
       throws NotSerializableException
   {
     Set result = null;
@@ -2914,13 +2905,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
     }
   }
   
-  /**
-   * returns the general purpose membership timer
-   */
-  public Timer getTimer() {
-    return this.timer;
-  }
-
   @Override
   public void stopped() {
   }
@@ -2933,24 +2917,15 @@ public class GMSMembershipManager implements MembershipManager, Manager
       }
       latestViewId = v.getViewId();
       latestView = v;
-      if (logger.isDebugEnabled()) {
-        logger.debug("MembershipManager: initial view is {}", latestView);
-      }
+      logger.debug("MembershipManager: initial view is {}", latestView);
     } else {
       handleOrDeferViewEvent(v);
     }
   }
 
   @Override
-  public void send(DistributionMessage m) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public InternalDistributedMember getMemberID(NetMember m) {
-    // TODO Auto-generated method stub
-    return null;
+  public Set<InternalDistributedMember> send(DistributionMessage m) throws NotSerializableException
{
+    return send(m.getRecipients(), m, this.services.getStatistics());
   }
 
   @Override
@@ -3000,7 +2975,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
 
   @Override
   public void setSecurityLogWriter(InternalLogWriter writer) {
-    services.setSecurityLogWriter(writer);
+    Services.setSecurityLogWriter(writer);
   }
 
 }


Mime
View raw message