geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject [2/3] incubator-geode git commit: GEODE-1372 added unit test and some more fixes.
Date Wed, 01 Jun 2016 22:29:24 GMT
GEODE-1372 added unit test and some more fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b6a73441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b6a73441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b6a73441

Branch: refs/heads/feature/GEODE-1372
Commit: b6a734415d1bec96c1f84a51b1fca16165bab0ae
Parents: 4d5f947
Author: Hitesh Khamesra <hiteshk25@yahoo.com>
Authored: Wed Jun 1 15:27:24 2016 -0700
Committer: Hitesh Khamesra <hiteshk25@yahoo.com>
Committed: Wed Jun 1 15:27:24 2016 -0700

----------------------------------------------------------------------
 .../internal/membership/NetView.java            |   4 +-
 .../membership/gms/interfaces/Messenger.java    |  12 +
 .../gms/locator/FindCoordinatorRequest.java     |  22 +-
 .../gms/locator/FindCoordinatorResponse.java    |  70 ++-
 .../membership/gms/locator/GMSLocator.java      |  12 +-
 .../membership/gms/membership/GMSJoinLeave.java | 151 +++++--
 .../gms/messages/InstallViewMessage.java        |  25 ++
 .../gms/messages/JoinRequestMessage.java        |  49 ++-
 .../gms/messages/JoinResponseMessage.java       |  63 ++-
 .../membership/gms/messenger/GMSEncrypt.java    |  82 ++--
 .../gms/messenger/JGroupsMessenger.java         | 431 +++++++++++++------
 .../internal/InternalDataSerializer.java        |  18 +
 .../DistributedMulticastRegionDUnitTest.java    |   2 +
 .../gemfire/distributed/LocatorDUnitTest.java   |  77 ++--
 .../LocatorUDPSecurityDUnitTest.java            |  28 ++
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  43 +-
 .../gms/messenger/GMSEncryptJUnitTest.java      |   6 +-
 .../messenger/JGroupsMessengerJUnitTest.java    | 199 ++++++++-
 18 files changed, 1010 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index 365a193..92fbcac 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -586,7 +586,7 @@ public class NetView implements DataSerializableFixedID {
     InternalDataSerializer.writeSet(crashedMembers, out);
     DataSerializer.writeIntArray(failureDetectionPorts, out);
     // TODO expensive serialization
-    DataSerializer.writeObject(publicKeys, out);
+    DataSerializer.writeHashMap(publicKeys, out);
   }
 
   @Override
@@ -598,7 +598,7 @@ public class NetView implements DataSerializableFixedID {
     shutdownMembers = InternalDataSerializer.readHashSet(in);
     crashedMembers = InternalDataSerializer.readHashSet(in);
     failureDetectionPorts = DataSerializer.readIntArray(in);
-    publicKeys = DataSerializer.readObject(in);
+    publicKeys = DataSerializer.readHashMap(in);
   }
 
   /** this will deserialize as an ArrayList */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index e10f325..3e9a2dc 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -86,4 +86,16 @@ public interface Messenger extends Service {
    * @param state the state of that member's outgoing messaging to this member
    */
   void waitForMessageState(InternalDistributedMember member, Map state) throws InterruptedException;
+  
+  byte[] getPublickey(InternalDistributedMember mbr);
+  
+  void setPublicKey(byte[] publickey, InternalDistributedMember mbr);
+  
+  void setClusterSecretKey(byte[] clusterSecretKey);
+  
+  byte[] getClusterSecretKey();
+  
+  int getRequestId();
+  
+  void initClusterKey();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
index 5c0a1d1..c434c25 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.Version;
 
 public class FindCoordinatorRequest extends HighPriorityDistributionMessage
@@ -35,15 +36,20 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
   private InternalDistributedMember memberID;
   private Collection<InternalDistributedMember> rejectedCoordinators;
   private int lastViewId;
-  
+  private byte[] myPublicKey;
+  private int requestId;   
+
   public FindCoordinatorRequest(InternalDistributedMember myId) {
     this.memberID = myId;
   }
   
-  public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, int lastViewId) {
+  public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, 
+      int lastViewId, byte[] pk, int requestId) {
     this.memberID = myId;
     this.rejectedCoordinators = rejectedCoordinators;
     this.lastViewId = lastViewId;
+    this.myPublicKey = pk;
+    this.requestId = requestId;
   }
   
   public FindCoordinatorRequest() {
@@ -54,6 +60,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
     return memberID;
   }
   
+  public byte[] getMyPublicKey() {
+    return myPublicKey;
+  }
+
   public Collection<InternalDistributedMember> getRejectedCoordinators() {
     return rejectedCoordinators;
   }
@@ -81,6 +91,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
   public int getDSFID() {
     return FIND_COORDINATOR_REQ;
   }
+  
+  public int getRequestId() {
+    return requestId;
+  }
 
   @Override
   public void toData(DataOutput out) throws IOException {
@@ -94,6 +108,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
       out.writeInt(0);
     }
     out.writeInt(lastViewId);
+    out.writeInt(requestId);
+    InternalDataSerializer.writeByteArray(this.myPublicKey, out);
   }
 
   @Override
@@ -105,6 +121,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage
       this.rejectedCoordinators.add((InternalDistributedMember)DataSerializer.readObject(in));
     }
     this.lastViewId = in.readInt();
+    this.requestId = in.readInt();
+    this.myPublicKey = InternalDataSerializer.readByteArray(in);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 0427cb4..07f0e58 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -19,6 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.locator;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -42,12 +43,14 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
   private boolean networkPartitionDetectionEnabled;
   private boolean usePreferredCoordinators;
   private boolean isShortForm;
-  
+  private byte[] coordinatorPublicKey;  
+
+  private int requestId;
   
   public FindCoordinatorResponse(InternalDistributedMember coordinator,
       InternalDistributedMember senderId,
       boolean fromView, NetView view, HashSet<InternalDistributedMember> registrants,
-      boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators) {
+      boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators, byte[] pk) {
     this.coordinator = coordinator;
     this.senderId = senderId;
     this.fromView = fromView;
@@ -56,19 +59,30 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
     this.networkPartitionDetectionEnabled = networkPartitionDectionEnabled;
     this.usePreferredCoordinators = usePreferredCoordinators;
     this.isShortForm = false;
+    this.coordinatorPublicKey = pk;
   }
   
   public FindCoordinatorResponse(InternalDistributedMember coordinator,
-      InternalDistributedMember senderId) {
+      InternalDistributedMember senderId, byte[] pk, int requestId) {
     this.coordinator = coordinator;
     this.senderId = senderId;
     this.isShortForm = true;
+    this.coordinatorPublicKey = pk;
+    this.requestId = requestId;
   }
   
   public FindCoordinatorResponse() {
     // no-arg constructor for serialization
   }
 
+  public byte[] getCoordinatorPublicKey() {
+    return coordinatorPublicKey;
+  }
+  
+  public int getRequestId() {
+    return requestId;
+  }
+  
   public boolean isNetworkPartitionDetectionEnabled() {
     return networkPartitionDetectionEnabled;
   }
@@ -131,6 +145,7 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
   public void toData(DataOutput out) throws IOException {
     DataSerializer.writeObject(coordinator, out);
     DataSerializer.writeObject(senderId, out);
+    InternalDataSerializer.writeByteArray(coordinatorPublicKey, out);
     out.writeBoolean(isShortForm);
     out.writeBoolean(fromView);
     out.writeBoolean(networkPartitionDetectionEnabled);
@@ -143,7 +158,8 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     coordinator = DataSerializer.readObject(in);
     senderId = DataSerializer.readObject(in);
-    isShortForm = in.readBoolean();
+    coordinatorPublicKey = InternalDataSerializer.readByteArray(in);
+    isShortForm = in.readBoolean();    
     if (!isShortForm) {
       fromView = in.readBoolean();
       networkPartitionDetectionEnabled = in.readBoolean();
@@ -158,4 +174,50 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
     throw new IllegalStateException("this message should not be executed");
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    FindCoordinatorResponse other = (FindCoordinatorResponse) obj;
+    if (coordinator == null) {
+      if (other.coordinator != null)
+        return false;
+    } else if (!coordinator.equals(other.coordinator))
+      return false;
+    if (!Arrays.equals(coordinatorPublicKey, other.coordinatorPublicKey))
+      return false;
+    if (fromView != other.fromView)
+      return false;
+    if (isShortForm != other.isShortForm)
+      return false;
+    if (networkPartitionDetectionEnabled != other.networkPartitionDetectionEnabled)
+      return false;
+    if (registrants == null) {
+      if (other.registrants != null)
+        return false;
+    } else if (!registrants.equals(other.registrants))
+      return false;
+    //as we are not sending requestId as part of FinDCoordinator resposne
+    /*if (requestId != other.requestId)
+      return false;*/
+    if (senderId == null) {
+      if (other.senderId != null)
+        return false;
+    } else if (!senderId.equals(other.senderId))
+      return false;
+    if (usePreferredCoordinators != other.usePreferredCoordinators)
+      return false;
+    if (view == null) {
+      if (other.view != null)
+        return false;
+    } else if (!view.equals(other.view))
+      return false;
+    return true;
+  }
+
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
index aab9002..402940b 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -167,7 +167,7 @@ public class GMSLocator implements Locator, NetLocator {
       }
     } else if (request instanceof FindCoordinatorRequest) {
       FindCoordinatorRequest findRequest = (FindCoordinatorRequest)request;
-      
+      services.getMessenger().setPublicKey(findRequest.getMyPublicKey(), findRequest.getMemberID());
       if (findRequest.getMemberID() != null) {
         InternalDistributedMember coord = null;
 
@@ -228,9 +228,17 @@ public class GMSLocator implements Locator, NetLocator {
         }
         
         synchronized(registrants) {
+          byte[] coordPk = null; 
+          if(view != null) {
+            coordPk = (byte[])view.getPublicKey(coord);            
+          }
+          if (coordPk == null) {
+            coordPk = services.getMessenger().getPublickey(coord);
+          }
           response = new FindCoordinatorResponse(coord, localAddress,
               fromView, view, new HashSet<InternalDistributedMember>(registrants),
-              this.networkPartitionDetectionEnabled, this.usePreferredCoordinators);
+              this.networkPartitionDetectionEnabled, this.usePreferredCoordinators, 
+              coordPk);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index d70884a..3cd634a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -39,6 +39,7 @@ import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
+
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
@@ -361,8 +362,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     } else {
       logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
       int port = services.getHealthMonitor().getFailureDetectionPort();
-      JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port);
-      services.getMessenger().send(req, state.view);
+      JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port, 
+          services.getMessenger().getRequestId());
+      //services.getMessenger().send(req, state.view);
+      services.getMessenger().send(req);
     }
 
     JoinResponseMessage response = null;
@@ -414,6 +417,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         NetView v = response.getCurrentView();
         InternalDistributedMember coord = v.getCoordinator();
         if (searchState.alreadyTried.contains(coord)) {
+          searchState.view = response.getCurrentView();
           // we already sent join request to it..so lets wait some more time here
           // assuming we got this response immediately, so wait for same timeout here..
           long timeout = Math.max(services.getConfig().getMemberTimeout(), services.getConfig().getJoinTimeout() / 5);
@@ -421,7 +425,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           response = joinResponse[0];
         } else {
           // try on this coordinator
-          searchState.possibleCoordinator = coord;
+          searchState.view = response.getCurrentView();
           response = null;
         }
         searchState.view = v;
@@ -467,7 +471,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
     if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
       logger.warn("detected an attempt to start a peer using an older version of the product {}", incomingRequest.getMemberID());
-      JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
+      JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version", incomingRequest.getRequestId());
       m.setRecipient(incomingRequest.getMemberID());
       services.getMessenger().send(m);
       return;
@@ -480,7 +484,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       rejection = e.getMessage();
     }
     if (rejection != null && rejection.length() > 0) {
-      JoinResponseMessage m = new JoinResponseMessage(rejection);
+      JoinResponseMessage m = new JoinResponseMessage(rejection, 0);
       m.setRecipient(incomingRequest.getMemberID());
       services.getMessenger().send(m);
       return;
@@ -632,13 +636,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     logger.debug("JoinLeave is recording the request to be processed in the next membership view");
     synchronized (viewRequests) {
       viewRequests.add(request);
+      if (viewCreator != null) {
+        boolean joinResponseSent = viewCreator.informToPendingJoinRequests();
+
+        if (!joinResponseSent && request instanceof JoinRequestMessage) {
+          JoinRequestMessage jreq = (JoinRequestMessage) request;
+          // this will inform about cluster-secret key, as we have authenticated at this point
+          JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(), services.getMessenger().getClusterSecretKey(), jreq.getRequestId());
+          services.getMessenger().send(response);
+        }
+      }
       viewRequests.notifyAll();
     }
-    if (viewCreator != null) {
-      viewCreator.informToPendingJoinRequests();
-    }
+    
   }
-
+  
   // for testing purposes, returns a copy of the view requests for verification
   List<DistributionMessage> getViewRequests() {
     synchronized (viewRequests) {
@@ -710,6 +722,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       if (newView != null) {
         viewCreator.setInitialView(newView, newView.getNewMembers(), newView.getShutdownMembers(), newView.getCrashedMembers());
       }
+      services.getMessenger().initClusterKey();
       viewCreator.setDaemon(true);
       logger.info("ViewCreator starting on:" + localAddress);
       viewCreator.start();
@@ -752,7 +765,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   private void sendJoinResponses(NetView newView, List<InternalDistributedMember> newMbrs) {
     for (InternalDistributedMember mbr : newMbrs) {
-      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+      JoinResponseMessage response = new JoinResponseMessage(mbr, newView, 0);
       services.getMessenger().send(response);
     }
   }
@@ -770,14 +783,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
 
-  private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
-    for (InternalDistributedMember mbr : newMbrs) {
-      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
-      services.getMessenger().send(response);
-    }
-  }
-
-
   boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException {
     return sendView(view, newMembers, true, this.prepareProcessor);
   }
@@ -841,7 +846,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     pendingRemovals.removeAll(view.getCrashedMembers());
     viewReplyProcessor.initialize(id, responders);
     viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals);
-    services.getMessenger().send(msg, view);
+    addPublickeysToView(view);
+    services.getMessenger().send(msg);
 
     // only wait for responses during preparation
     if (preparing) {
@@ -868,13 +874,32 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     return true;
   }
 
+  private void addPublickeysToView(NetView view) {
+    //TODO: is this check is correct
+    if (services != null && services.getConfig() != null && services.getConfig().getDistributionConfig() != null) {
+      String sDHAlgo = services.getConfig().getDistributionConfig().getSecurityClientDHAlgo();
+      if (sDHAlgo != null && !sDHAlgo.isEmpty()) {
+        List<InternalDistributedMember> mbrs = view.getMembers();
+        Iterator<InternalDistributedMember> itr = mbrs.iterator();
+
+        while (itr.hasNext()) {
+          InternalDistributedMember mbr = itr.next();
+          byte[] pk = services.getMessenger().getPublickey(mbr);
+          view.setPublicKey(mbr, pk);
+        }
+      }
+    }
+  }
   private void processViewMessage(final InstallViewMessage m) {
 
     NetView view = m.getView();
     
     if(currentView != null && !currentView.contains(m.getSender())) {
-      logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView);
-      return;
+      if(this.preparedView == null || !this.preparedView.contains(m.getSender())) 
+      { 
+        logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView);
+        return;
+      }
     }
 
     if (currentView != null && view.getViewId() < currentView.getViewId()) {
@@ -927,7 +952,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   private void ackView(InstallViewMessage m) {
     if (!playingDead && m.getView().contains(m.getView().getCreator())) {
-      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()), m.getView());
+      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
     }
   }
 
@@ -968,7 +993,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       return findCoordinatorFromView();
     }
 
-    FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
+    FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId, 
+        services.getMessenger().getPublickey(localAddress), services.getMessenger().getRequestId());
     Set<InternalDistributedMember> possibleCoordinators = new HashSet<InternalDistributedMember>();
     Set<InternalDistributedMember> coordinatorsWithView = new HashSet<InternalDistributedMember>();
 
@@ -988,6 +1014,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout);
           FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null;
           if (response != null) {
+            setCoordinatorPublicKey(response);
             state.locatorsContacted++;
             if (!state.hasContactedAJoinedLocator &&
                 response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
@@ -1084,16 +1111,35 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     if (state.registrants != null) {
       recipients.addAll(state.registrants);
     }
-    recipients.remove(localAddress);
-    FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId);
-    req.setRecipients(v.getMembers());
+    recipients.remove(localAddress);    
 
+   // FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+     //   localAddress), services.getMessenger().getRequestId());
+    //req.setRecipients(v.getMembers());
+    
     boolean testing = unitTesting.contains("findCoordinatorFromView");
     synchronized (state.responses) {
       if (!testing) {
         state.responses.clear();
       }
-      services.getMessenger().send(req);
+      
+      if (!services.getConfig().getDistributionConfig().getSecurityClientDHAlgo().isEmpty()) {
+        for (InternalDistributedMember mbr : v.getMembers()) {
+          Set<InternalDistributedMember> r = new HashSet<>();
+          r.add(mbr);
+          FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+              localAddress), services.getMessenger().getRequestId());
+          req.setRecipients(r);
+
+          services.getMessenger().send(req, v);
+        }
+      } else {
+        FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey(
+            localAddress), services.getMessenger().getRequestId());
+        req.setRecipients(v.getMembers());
+
+        services.getMessenger().send(req, v);
+      }
       try {
         if (!testing) {
           state.responses.wait(DISCOVERY_TIMEOUT);
@@ -1144,8 +1190,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   private void processJoinResponse(JoinResponseMessage rsp) {
     synchronized (joinResponse) {
       if (!this.isJoined) {
-        joinResponse[0] = rsp;
-        joinResponse.notifyAll();
+        //1. our joinRequest rejected.
+        //2. Member which was coordinator but just now some other member became coordinator
+        //3. we got message with secret key, but still view is coming and that will inform the joining thread
+        if (rsp.getRejectionMessage() != null || rsp.getCurrentView() != null) {
+          joinResponse[0] = rsp;
+          joinResponse.notifyAll();
+        } else {
+          //we got secret key lets add it
+          services.getMessenger().setClusterSecretKey(rsp.getSecretPk());
+        }
       }
     }
   }
@@ -1170,9 +1224,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     FindCoordinatorResponse resp;
     if (this.isJoined) {
       NetView v = currentView;
-      resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress);
+      resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress, 
+          services.getMessenger().getPublickey(v.getCoordinator()), req.getRequestId());
     } else {
-      resp = new FindCoordinatorResponse(localAddress, localAddress);
+      resp = new FindCoordinatorResponse(localAddress, localAddress, 
+          services.getMessenger().getPublickey(localAddress), req.getRequestId());
     }
     resp.setRecipient(req.getMemberID());
     services.getMessenger().send(resp);
@@ -1183,6 +1239,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       searchState.responses.add(resp);
       searchState.responses.notifyAll();
     }
+    setCoordinatorPublicKey(resp);
+  }
+  
+  private void setCoordinatorPublicKey(FindCoordinatorResponse response) {
+    if (response.getCoordinator() != null && response.getCoordinatorPublicKey() != null)
+      services.getMessenger().setPublicKey(response.getCoordinatorPublicKey(), response.getCoordinator());
   }
 
   private void processNetworkPartitionMessage(NetworkPartitionMessage msg) {
@@ -1993,36 +2055,40 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     }
     
-    synchronized void informToPendingJoinRequests() {
+    synchronized boolean informToPendingJoinRequests() {
+      boolean joinResponseSent = false;
       if (!shutdown) {
-        return;
+        return joinResponseSent;
       }
-
       ArrayList<DistributionMessage> requests = new ArrayList<>();
       synchronized (viewRequests) {
         if (viewRequests.size() > 0) {
           requests.addAll(viewRequests);
         } else {
-          return;
+          return joinResponseSent;
         }
         viewRequests.clear();
       }
 
+      
       for (DistributionMessage msg : requests) {
         switch (msg.getDSFID()) {
         case JOIN_REQUEST:
-          logger.info("Informing to pending join requests {}", msg);
-
+      
           NetView v = currentView;
+          logger.info("Informing to pending join requests {} myid {} coord {}", msg, localAddress, v.getCoordinator());
           if (!v.getCoordinator().equals(localAddress)) {
+            joinResponseSent = true;
             //lets inform that coordinator has been changed
-            JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v);
+            JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v, ((JoinRequestMessage) msg).getRequestId());
             services.getMessenger().send(jrm);
           }
         default:
           break;
         }
       }
+      
+      return joinResponseSent;
     }
 
     /**
@@ -2033,7 +2099,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     void createAndSendView(List<DistributionMessage> requests) throws InterruptedException {
       List<InternalDistributedMember> joinReqs = new ArrayList<>(10);
       Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10);
-      Map<InternalDistributedMember, Object> joinKeys = new HashMap<>(10);
       Set<InternalDistributedMember> leaveReqs = new HashSet<>(10);
       List<InternalDistributedMember> removalReqs = new ArrayList<>(10);
       List<String> removalReasons = new ArrayList<String>(10);
@@ -2067,7 +2132,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           if (!joinReqs.contains(mbr)) {
             joinReqs.add(mbr);
             joinPorts.put(mbr, port);
-            joinKeys.put(mbr, jmsg.getPublicKey());
           }
           break;
         case LEAVE_REQUEST_MESSAGE:
@@ -2137,7 +2201,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         for (InternalDistributedMember mbr : joinReqs) {
           if (mbrs.contains(mbr)) {
             newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
-            newView.setPublicKey(mbr, joinKeys.get(mbr));
           }
         }
         if (currentView != null) {
@@ -2161,7 +2224,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         return;
       }
 
-      sendJoinResponses(joinReqs, newView);
+      //we already sent whrn we got join request
+      //sendJoinResponses(newView, joinReqs);
 
       // send removal messages before installing the view so we stop
       // getting messages from members that have been kicked out
@@ -2289,7 +2353,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
       // we also send a join response so that information like the multicast message digest
       // can be transmitted to the new members w/o including it in the view message
-      sendJoinResponses(newView, joinReqs);
+      //we already sent whrn we got join request
+      //sendJoinResponses(newView, joinReqs);
 
       if (markViewCreatorForShutdown && getViewCreator() != null) {
         shutdown = true;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
index c41584f..224fef1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
@@ -110,4 +110,29 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
              +")";
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InstallViewMessage other = (InstallViewMessage) obj;
+    if (credentials == null) {
+      if (other.credentials != null)
+        return false;
+    } else if (!credentials.equals(other.credentials))
+      return false;
+    if (kind != other.kind)
+      return false;
+    if (previousViewId != other.previousViewId)
+      return false;
+    if (view == null) {
+      if (other.view != null)
+        return false;
+    } else if (!view.equals(other.view))
+      return false;
+    return true;
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 5545935..b282daa 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -30,25 +30,25 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   private InternalDistributedMember memberID;
   private Object credentials;
   private int failureDetectionPort = -1;
-  private Object publicKey;
-  
+  private int requestId;
+    
   public JoinRequestMessage(InternalDistributedMember coord,
-                            InternalDistributedMember id, Object credentials, int fdPort) {
+                            InternalDistributedMember id, Object credentials, int fdPort, int requestId) {
     super();
     setRecipient(coord);
     this.memberID = id;
     this.credentials = credentials;
-    this.publicKey = null;
     this.failureDetectionPort = fdPort;
+    this.requestId = requestId;
   }
   public JoinRequestMessage() {
     // no-arg constructor for serialization
   }
 
-  public void setPublicKey(Object key) {
-    this.publicKey = key;
+  public int getRequestId() {
+    return requestId;
   }
-
+  
   @Override
   public int getDSFID() {
     return JOIN_REQUEST;
@@ -67,10 +67,6 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
     return credentials;
   }
 
-  public Object getPublicKey() {
-    return publicKey;
-  }
-  
   @Override
   public String toString() {
     return getShortClassName() + "(" + memberID + (credentials==null? ")" : "; with credentials)") + " failureDetectionPort:" + failureDetectionPort;
@@ -85,24 +81,49 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   public void toData(DataOutput out) throws IOException {
     DataSerializer.writeObject(memberID, out);
     DataSerializer.writeObject(credentials, out);
-    DataSerializer.writeObject(publicKey, out);
     DataSerializer.writePrimitiveInt(failureDetectionPort, out);
     // preserve the multicast setting so the receiver can tell
     // if this is a mcast join request
     out.writeBoolean(getMulticast());
+    out.writeInt(requestId);
   }
 
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     memberID = DataSerializer.readObject(in);
     credentials = DataSerializer.readObject(in);
-    publicKey = DataSerializer.readObject(in);
     failureDetectionPort = DataSerializer.readPrimitiveInt(in);
     setMulticast(in.readBoolean());
+    requestId = in.readInt();
   }
 
   public int getFailureDetectionPort() {
     return failureDetectionPort;
   }
-
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JoinRequestMessage other = (JoinRequestMessage) obj;
+    if (credentials == null) {
+      if (other.credentials != null)
+        return false;
+    } else if (!credentials.equals(other.credentials))
+      return false;
+    if (failureDetectionPort != other.failureDetectionPort)
+      return false;
+    if (memberID == null) {
+      if (other.memberID != null)
+        return false;
+    } else if (!memberID.equals(other.memberID))
+      return false;
+    if (requestId != other.requestId)
+      return false;
+    return true;
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index ad9c319..ff20a4e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,20 +38,39 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
   private String rejectionMessage;
   private InternalDistributedMember memberID;
   private byte[] messengerData;
-  
-  public JoinResponseMessage(InternalDistributedMember memberID, NetView view) {
+  private int requestId;
+  private byte[] secretPk;
+    
+  public JoinResponseMessage(InternalDistributedMember memberID, NetView view, int requestId) {
     this.currentView = view;
     this.memberID = memberID;
+    this.requestId = requestId;
     setRecipient(memberID);
   }
   
-  public JoinResponseMessage(String rejectionMessage) {
+  public JoinResponseMessage(InternalDistributedMember memberID, byte[] sPk, int requestId) {
+    this.memberID = memberID;
+    this.requestId = requestId;
+    this.secretPk = sPk;
+    setRecipient(memberID);
+  }
+  
+  public JoinResponseMessage(String rejectionMessage, int requestId) {
     this.rejectionMessage = rejectionMessage;
+    this.requestId = requestId;
   }
   
   public JoinResponseMessage() {
     // no-arg constructor for serialization
   }
+  
+  public byte[] getSecretPk() {
+    return secretPk;
+  }
+  
+  public int getRequestId() {
+    return requestId;
+  }
 
   public NetView getCurrentView() {
     return currentView;
@@ -101,6 +121,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
     DataSerializer.writeObject(memberID, out);
     DataSerializer.writeString(rejectionMessage, out);
     DataSerializer.writeByteArray(messengerData, out);
+    DataSerializer.writeByteArray(secretPk, out);
   }
 
   @Override
@@ -109,6 +130,42 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
     memberID = DataSerializer.readObject(in);
     rejectionMessage = DataSerializer.readString(in);
     messengerData = DataSerializer.readByteArray(in);
+    secretPk = DataSerializer.readByteArray(in);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JoinResponseMessage other = (JoinResponseMessage) obj;
+    if (currentView == null) {
+      if (other.currentView != null)
+        return false;
+    } else if (!currentView.equals(other.currentView))
+      return false;
+    if (memberID == null) {
+      if (other.memberID != null)
+        return false;
+    } else if (!memberID.equals(other.memberID))
+      return false;
+    if (!Arrays.equals(messengerData, other.messengerData))
+      return false;
+    if (rejectionMessage == null) {
+      if (other.rejectionMessage != null)
+        return false;
+    } else if (!rejectionMessage.equals(other.rejectionMessage))
+      return false;
+    //as we are not sending as part of JoinResposne
+    /*if (requestId != other.requestId)
+      return false;*/
+    if (!Arrays.equals(secretPk, other.secretPk))
+      return false;
+    return true;
   }
 
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
index f307290..047bb03 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -23,7 +23,6 @@ import java.security.spec.X509EncodedKeySpec;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.crypto.Cipher;
@@ -37,18 +36,14 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.distributed.internal.membership.NetView;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 
-import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.logging.LogService;
 
 public class GMSEncrypt implements Cloneable {
 
   public static long encodingsPerformed;
   public static long decodingsPerformed;
 
-  private static final Logger logger = LogService.getLogger();
-
   // Parameters for the Diffie-Hellman key exchange
   private static final BigInteger dhP = new BigInteger("13528702063991073999718992897071702177131142188276542919088770094024269" + "73079899070080419278066109785292538223079165925365098181867673946"
       + "34756714063947534092593553024224277712367371302394452615862654308" + "11180902979719649450105660478776364198726078338308557022096810447" + "3500348898008043285865193451061481841186553");
@@ -74,27 +69,27 @@ public class GMSEncrypt implements Cloneable {
 
   private ClusterEncryptor clusterEncryptor;
 
-  protected void installView(NetView view) throws Exception {
+  protected void installView(NetView view) {
     this.view = view;
     this.view.setPublicKey(services.getJoinLeave().getMemberID(), getPublicKeyBytes());
   }
 
-  protected void installView(NetView view, InternalDistributedMember mbr) throws Exception {
+  protected void installView(NetView view, InternalDistributedMember mbr) {
     this.view = view;
-    // this.view.setPublicKey(mbr, getPublicKeyBytes());
-    // TODO remove ciphers for departed members
-    // addClusterKey();
   }
 
-  protected byte[] getSecretBytes() {
+  protected byte[] getClusterSecretKey() {
     return this.clusterEncryptor.secretBytes;
   }
 
-  protected synchronized void addClusterKey() throws Exception {
-    this.clusterEncryptor = new ClusterEncryptor(this);
+  protected synchronized void initClusterSecretKey() throws Exception {
+    if(this.clusterEncryptor == null) {
+      this.clusterEncryptor = new ClusterEncryptor(this);
+    }
   }
 
-  protected synchronized void addClusterKey(byte[] secretBytes) throws Exception {
+  protected synchronized void addClusterKey(byte[] secretBytes) {
+    //TODO we are reseeting here, in case there is some race
     this.clusterEncryptor = new ClusterEncryptor(secretBytes);
   }
 
@@ -124,6 +119,11 @@ public class GMSEncrypt implements Cloneable {
   public byte[] decryptData(byte[] data) throws Exception {
     return this.clusterEncryptor.decryptBytes(data);
   }
+  
+  public byte[] decryptData(byte[] data, byte[] pkBytes) throws Exception {
+    PeerEncryptor encryptor = new PeerEncryptor(pkBytes);
+    return encryptor.decryptBytes(data);    
+  }
 
   public byte[] encryptData(byte[] data) throws Exception {
     return this.clusterEncryptor.encryptBytes(data);
@@ -132,6 +132,26 @@ public class GMSEncrypt implements Cloneable {
   protected byte[] getPublicKeyBytes() {
     return dhPublicKey.getEncoded();
   }
+  
+  protected byte[] getPublicKey(InternalDistributedMember member) {
+    try {
+      InternalDistributedMember localMbr = services.getMessenger().getMemberID();
+      if (localMbr != null && localMbr.equals(member)) {
+        return this.dhPublicKey.getEncoded();// local one
+      }
+      return getPeerEncryptor(member).peerPublicKey.getEncoded();
+    } catch (Exception e) {
+      throw new RuntimeException("Not found public key for member " + member, e);
+    }
+  }
+  
+  protected void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
+    try {
+      createPeerEncryptor(mbr, publickey);
+    }catch(Exception e) {
+      throw new RuntimeException("Unable to create peer encryptor " +  mbr, e);
+    }
+  }
 
   @Override
   protected GMSEncrypt clone() throws CloneNotSupportedException {
@@ -142,7 +162,6 @@ public class GMSEncrypt implements Cloneable {
 
       X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(this.dhPublicKey.getEncoded());
       KeyFactory keyFact = KeyFactory.getInstance("DH");
-      // PublicKey pubKey = keyFact.generatePublic(x509KeySpec);
       gmsEncrypt.dhPublicKey = keyFact.generatePublic(x509KeySpec);
       final String format = this.dhPrivateKey.getFormat();
       System.out.println("private key format " + format);
@@ -180,16 +199,20 @@ public class GMSEncrypt implements Cloneable {
     }
   }
 
-  protected synchronized PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
+  protected PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception {
     PeerEncryptor result = memberToPeerEncryptor.get(member);
     if (result == null) {
-      result = createPeerEncryptor(member);
+      synchronized (this) {
+        result = memberToPeerEncryptor.get(member);
+        if (result == null) {
+          result = createPeerEncryptor(member, (byte[]) view.getPublicKey(member));
+        }
+      }
     }
     return result;
   }
 
-  private PeerEncryptor createPeerEncryptor(InternalDistributedMember member) throws Exception {
-    byte[] peerKeyBytes = (byte[]) view.getPublicKey(member);
+  private PeerEncryptor createPeerEncryptor(InternalDistributedMember member, byte[] peerKeyBytes) throws Exception {
     PeerEncryptor result = new PeerEncryptor(peerKeyBytes);
     memberToPeerEncryptor.put(member, result);
     return result;
@@ -274,7 +297,7 @@ public class GMSEncrypt implements Cloneable {
       this.peerPublicKey = getPublicKey(peerPublicKeyBytes);
     }
 
-    public byte[] encryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] encryptBytes(byte[] data) throws Exception {
       String algo = null;
       if (this.peerSKAlgo != null) {
         algo = this.peerSKAlgo;
@@ -295,7 +318,7 @@ public class GMSEncrypt implements Cloneable {
       return encrypt;
     }
 
-    public byte[] decryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] decryptBytes(byte[] data) throws Exception {
       String algo = null;
       if (this.peerSKAlgo != null) {
         algo = this.peerSKAlgo;
@@ -352,7 +375,7 @@ public class GMSEncrypt implements Cloneable {
     int blocksize = getBlockSize(dhSKAlgo);
 
     if (keysize == -1 || blocksize == -1) {
-      // TODO how should we do here
+      // TODO how should we do here, should we just throw runtime exception?
       /* SecretKey sKey = ka.generateSecret(dhSKAlgo);
        * encrypt = Cipher.getInstance(dhSKAlgo);
        * encrypt.init(Cipher.ENCRYPT_MODE, sKey); */
@@ -403,7 +426,7 @@ public class GMSEncrypt implements Cloneable {
     int blocksize = getBlockSize(dhSKAlgo);
 
     if (keysize == -1 || blocksize == -1) {
-      // TODO: how to do here
+      // TODO: how to do here, should we just throw runtime exception?
       /* SecretKey sKey = ka.generateSecret(dhSKAlgo);
        * decrypt = Cipher.getInstance(dhSKAlgo);
        * decrypt.init(Cipher.DECRYPT_MODE, sKey); */
@@ -431,8 +454,6 @@ public class GMSEncrypt implements Cloneable {
       SecretKey sKey = ka.generateSecret(dhSKAlgo);
       return sKey.getEncoded();
     } else {
-      String algoStr = getDhAlgoStr(dhSKAlgo);
-
       return ka.generateSecret();
     }
   }
@@ -453,16 +474,13 @@ public class GMSEncrypt implements Cloneable {
 
   /***
    * this will hold the common key for cluster
-   * that will be created using publickey of all the members..
-   *
    */
   protected class ClusterEncryptor {
     byte[] secretBytes;
+    //TODO: need to look this is thread safe
     Cipher encrypt;
     Cipher decrypt;
-    int viewId;
-    Set<InternalDistributedMember> mbrs;
-
+   
     public ClusterEncryptor(GMSEncrypt other) throws Exception {
       GMSEncrypt mine = new GMSEncrypt(other.services);
       this.secretBytes = GMSEncrypt.generateSecret(mine.dhSKAlgo, mine.dhPrivateKey, other.dhPublicKey);
@@ -472,7 +490,7 @@ public class GMSEncrypt implements Cloneable {
       this.secretBytes = sb;
     }
 
-    public byte[] encryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] encryptBytes(byte[] data) throws Exception {
       String algo = dhSKAlgo;
       return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo));
     }
@@ -491,7 +509,7 @@ public class GMSEncrypt implements Cloneable {
       return encrypt;
     }
 
-    public byte[] decryptBytes(byte[] data) throws Exception {
+    public synchronized byte[] decryptBytes(byte[] data) throws Exception {
       String algo = dhSKAlgo;
       Cipher c = getDecryptCipher(algo);
       return GMSEncrypt.decryptBytes(data, c);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index cba5d5f..eedfc5f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -19,10 +19,13 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -33,12 +36,15 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -80,10 +86,13 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
 import com.gemstone.gemfire.internal.ClassPathLoader;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.OSProcess;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.Version;
@@ -256,11 +265,11 @@ public class JGroupsMessenger implements Messenger {
     if ( !dc.getSecurityClientDHAlgo().isEmpty() ) {
       try {
         this.encrypt = new GMSEncrypt(services);
+        logger.info("Initializing GMSEncrypt ");
       } catch (Exception e) {
         throw new GemFireConfigException("problem initializing encryption protocol", e);
       }
     }
-
   }
 
   @Override
@@ -389,12 +398,7 @@ public class JGroupsMessenger implements Messenger {
     addressesWithIoExceptionsProcessed.clear();
 
     if (encrypt != null) {
-      try {
-        encrypt.installView(v);
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+      encrypt.installView(v);
     }
   }
   
@@ -607,20 +611,7 @@ public class JGroupsMessenger implements Messenger {
   public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) {
     return send(msg, false);
   }
-
-  @Override
-  public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
-    if (this.encrypt != null) {
-      try {
-        this.encrypt.installView(alternateView);
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-    }
-    return send(msg, true);
-  }
-
+    
   @Override
   public Set<InternalDistributedMember> send(DistributionMessage msg) {
     return send(msg, true);
@@ -670,7 +661,7 @@ public class JGroupsMessenger implements Messenger {
     if (useMcast) {
 
       long startSer = theStats.startMsgSerialization();
-      Message jmsg = createJGMessage(msg, local, null, Version.CURRENT_ORDINAL);
+      Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
       theStats.endMsgSerialization(startSer);
 
       Exception problem = null;
@@ -712,7 +703,7 @@ public class JGroupsMessenger implements Messenger {
     } // useMcast
     else { // ! useMcast
       int len = destinations.length;
-      List<InternalDistributedMember> calculatedMembers; // explicit list of members
+      List<GMSMember> calculatedMembers; // explicit list of members
       int calculatedLen; // == calculatedMembers.len
       if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all
         // Grab a copy of the current membership
@@ -720,40 +711,51 @@ public class JGroupsMessenger implements Messenger {
 
         // Construct the list
         calculatedLen = v.size();
-        calculatedMembers = new LinkedList<InternalDistributedMember>();
+        calculatedMembers = new LinkedList<GMSMember>();
         for (int i = 0; i < calculatedLen; i ++) {
           InternalDistributedMember m = (InternalDistributedMember)v.get(i);
-          calculatedMembers.add(m);
+          calculatedMembers.add((GMSMember)m.getNetMember());
         }
       } // send to all
       else { // send to explicit list
         calculatedLen = len;
-        calculatedMembers = new LinkedList<>();
+        calculatedMembers = new LinkedList<GMSMember>();
         for (int i = 0; i < calculatedLen; i ++) {
-          calculatedMembers.add(destinations[i]);
+          calculatedMembers.add((GMSMember)destinations[i].getNetMember());
         }
       } // send to explicit list
       Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
       long startSer = theStats.startMsgSerialization();
-
-      boolean encode = (encrypt != null);
-
       boolean firstMessage = true;
+      for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) {
+        GMSMember mbr = it.next();
+        short version = mbr.getVersionOrdinal();
+        if ( !messages.containsKey(version) ) {
+          Message jmsg = createJGMessage(msg, local, version);
+          messages.put(version, jmsg);
+          if (firstMessage) {
+            theStats.incSentBytes(jmsg.getLength());
+            firstMessage = false;
+          }
+        }
+      }
+      theStats.endMsgSerialization(startSer);
       Collections.shuffle(calculatedMembers);
       int i=0;
-      for (InternalDistributedMember mbr: calculatedMembers) {
-        short version = mbr.getNetMember().getVersionOrdinal();
+      for (GMSMember mbr: calculatedMembers) {
         JGAddress to = new JGAddress(mbr);
-        Message jmsg = createJGMessage(msg, local, mbr, version);
+        short version = mbr.getVersionOrdinal();
+        Message jmsg = (Message)messages.get(version);
         Exception problem = null;
         try {
+          Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
           if (!reliably) {
             jmsg.setFlag(Message.Flag.NO_RELIABILITY);
           }
-          jmsg.setDest(to);
-          jmsg.setSrc(this.jgAddress);
+          tmp.setDest(to);
+          tmp.setSrc(this.jgAddress);
           logger.trace("Unicasting to {}", to);
-          myChannel.send(jmsg);
+          myChannel.send(tmp);
         }
         catch (Exception e) {
           problem = e;
@@ -810,7 +812,7 @@ public class JGroupsMessenger implements Messenger {
    * @param version the version of the recipient
    * @return the new message
    */
-  Message createJGMessage(DistributionMessage gfmsg, JGAddress src, InternalDistributedMember recipient, short version) {
+  Message createJGMessage(DistributionMessage gfmsg, JGAddress src, short version) {
     if(gfmsg instanceof DirectReplyMessage) {
       ((DirectReplyMessage) gfmsg).registerProcessor();
     }
@@ -820,35 +822,17 @@ public class JGroupsMessenger implements Messenger {
     setMessageFlags(gfmsg, msg);
     try {
       long start = services.getStatistics().startMsgSerialization();
-      HeapDataOutputStream out_stream =
-        new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+      byte[] messageBytes = null;
+      HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
       Version.CURRENT.writeOrdinal(out_stream, true);
-      DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
-      boolean encode = encrypt != null && recipient != null;
-      if (encode) {
-        // Coordinator doesn't know our publicKey for a JoinRequest
-        if (gfmsg.getDSFID() == JOIN_REQUEST || gfmsg.getDSFID() == JOIN_RESPONSE) {
-          encode = false;
-        }
-      }
-      if (encode) {
-        logger.info("encoding {}", gfmsg);
-        try {
-          out_stream.writeBoolean(true); // TODO we should have flag bits
-          HeapDataOutputStream out_stream2 =
-            new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
-          DataSerializer.writeObject(gfmsg, out_stream2);
-          byte[] payload = out_stream2.toByteArray();
-          payload = encrypt.encryptData(payload, recipient);
-          DataSerializer.writeByteArray(payload, out_stream);
-        } catch (Exception e) {
-          throw new GemFireIOException("unable to send message", e);
-        }
+      if(encrypt != null) {
+        out_stream.writeBoolean(true);
+        writeEncryptedMessage(gfmsg, version, out_stream);                
       } else {
-        logger.info("not encoding {}", gfmsg);
         out_stream.writeBoolean(false);
-        DataSerializer.writeObject(gfmsg, out_stream);
+        serializeMessage(gfmsg, out_stream);
       }
+      
       msg.setBuffer(out_stream.toByteArray());
       services.getStatistics().endMsgSerialization(start);
     }
@@ -862,9 +846,82 @@ public class JGroupsMessenger implements Messenger {
         ioe.initCause(ex);
         throw ioe;
       }
+    } catch(Exception ex){
+      logger.warn("Error serializing message", ex);
+      GemFireIOException ioe = new
+          GemFireIOException("Error serializing message");
+        ioe.initCause(ex.getCause());
+        throw ioe;
     }
     return msg;
   }
+  
+  void writeEncryptedMessage(DistributionMessage gfmsg, short version, HeapDataOutputStream out) throws Exception {
+    InternalDataSerializer.writeDSFIDHeader(gfmsg.getDSFID(), out);
+    byte[] pk = null;
+    int requestId = 0;
+    InternalDistributedMember pkMbr = null;
+    switch (gfmsg.getDSFID()) {
+    case FIND_COORDINATOR_REQ:
+    case JOIN_REQUEST:
+      //need to append mine PK
+      pk = encrypt.getPublicKey(localAddress);
+      
+      pkMbr = gfmsg.getRecipients()[0];      
+      requestId = getRequestId(gfmsg, true);
+      break;
+    case FIND_COORDINATOR_RESP:
+    case JOIN_RESPONSE:
+      pkMbr = gfmsg.getRecipients()[0];
+      requestId = getRequestId(gfmsg, false);
+    default:
+      break;
+    }
+    logger.debug("writeEncryptedMessage gfmsg.getDSFID() = {}  for {} with requestid  {}", gfmsg.getDSFID(), pkMbr, requestId);
+    out.writeInt(requestId);
+    if (pk != null) {      
+      InternalDataSerializer.writeByteArray(pk, out);
+    }
+
+    HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
+    byte[] messageBytes = serializeMessage(gfmsg, out_stream);
+    
+    if (pkMbr != null) {
+      // using members private key
+      messageBytes = encrypt.encryptData(messageBytes, pkMbr);
+    } else {
+      // using cluster secret key
+      messageBytes = encrypt.encryptData(messageBytes);
+    }
+    InternalDataSerializer.writeByteArray(messageBytes, out);
+  }
+  
+  int getRequestId(DistributionMessage gfmsg, boolean add) {
+    int requestId = 0;
+    if (gfmsg instanceof FindCoordinatorRequest) {
+      requestId = ((FindCoordinatorRequest) gfmsg).getRequestId();
+    } else if (gfmsg instanceof JoinRequestMessage) {
+      requestId = ((JoinRequestMessage) gfmsg).getRequestId();
+    } else if (gfmsg instanceof FindCoordinatorResponse) {
+      requestId = ((FindCoordinatorResponse) gfmsg).getRequestId();
+    } else if (gfmsg instanceof JoinResponseMessage) {
+      requestId = ((JoinResponseMessage) gfmsg).getRequestId();
+    }
+
+    if (add) {
+      addRequestId(requestId, gfmsg.getRecipients()[0]);
+    }
+
+    return requestId;
+  }
+  
+  byte[] serializeMessage(DistributionMessage gfmsg, HeapDataOutputStream out_stream) throws IOException {
+    
+    DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream);
+    DataSerializer.writeObject(gfmsg, out_stream);
+    
+    return out_stream.toByteArray();
+  }
 
   void setMessageFlags(DistributionMessage gfmsg, Message msg) {
     // GemFire uses its own reply processors so there is no need
@@ -905,17 +962,14 @@ public class JGroupsMessenger implements Messenger {
       // as STABLE_GOSSIP
       logger.trace("message length is zero - ignoring");
       return null;
-    }
-
-    InternalDistributedMember sender = null;
+    }    
 
     Exception problem = null;
     byte[] buf = jgmsg.getRawBuffer();
     try {
       long start = services.getStatistics().startMsgDeserialization();
       
-
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf,
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, 
           jgmsg.getOffset(), jgmsg.getLength()));
 
       short ordinal = Version.readOrdinal(dis);
@@ -924,44 +978,33 @@ public class JGroupsMessenger implements Messenger {
         dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
             ordinal, true));
       }
-
-      GMSMember m = DataSerializer.readObject(dis);
-
-      sender = getMemberFromView(m, ordinal);
-
-      boolean encrypted = dis.readBoolean();
-
-      if (encrypted && encrypt != null) {
-        byte[] payload = DataSerializer.readByteArray(dis);
-        try {
-          payload = encrypt.decryptData(payload, sender);
-          dis = new DataInputStream(new ByteArrayInputStream(payload));
-          if (ordinal < Version.CURRENT_ORDINAL) {
-            dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(
-              ordinal, true));
-          }
-        } catch (Exception e) {
-          throw new GemFireIOException("unable to receive message", e);
-        }
-      }
-
-      result = DataSerializer.readObject(dis);
-
-      DistributionMessage dm = (DistributionMessage)result;
+    
+      //read
+      boolean isEncrypted = dis.readBoolean();
+      
+      if(isEncrypted && encrypt == null) {
+        throw new GemFireConfigException("Got remote message as encrypted");
+      } 
       
-      // JoinRequestMessages are sent with an ID that may have been
-      // reused from a previous life by way of auto-reconnect,
-      // so we don't want to find a canonical reference for the
-      // request's sender ID
-      if (dm.getDSFID() == JOIN_REQUEST) {
-        sender = ((JoinRequestMessage)dm).getMemberID();
+      if(isEncrypted) {
+        result = readEncryptedMessage(dis, ordinal, encrypt);
+      } else {
+        GMSMember m = DataSerializer.readObject(dis);
+  
+        result = DataSerializer.readObject(dis);
+  
+        DistributionMessage dm = (DistributionMessage)result;
+        
+        setSender(dm, m, ordinal);
       }
-      ((DistributionMessage)result).setSender(sender);
+      
       
       services.getStatistics().endMsgDeserialization(start);
     }
     catch (ClassNotFoundException | IOException | RuntimeException e) {
       problem = e;
+    } catch(Exception e) {
+      problem = e;
     }
     if (problem != null) {
       logger.error(LocalizedMessage.create(
@@ -972,37 +1015,113 @@ public class JGroupsMessenger implements Messenger {
     return result;
   }
   
-  
-  /** look for certain messages that may need to be altered before being sent */
-  void filterOutgoingMessage(DistributionMessage m) {
-    switch (m.getDSFID()) {
+  void setSender(DistributionMessage dm, GMSMember m, short ordinal) {
+    InternalDistributedMember sender = null;
+    // JoinRequestMessages are sent with an ID that may have been
+    // reused from a previous life by way of auto-reconnect,
+    // so we don't want to find a canonical reference for the
+    // request's sender ID
+    if (dm.getDSFID() == JOIN_REQUEST) {
+      sender = ((JoinRequestMessage)dm).getMemberID();
+    } else {
+      sender = getMemberFromView(m, ordinal);
+    }
+    dm.setSender(sender);
+  }
+
+  @SuppressWarnings("resource")
+  DistributionMessage readEncryptedMessage(DataInputStream dis, short ordinal, GMSEncrypt encryptLocal) throws Exception {
+    int dfsid = InternalDataSerializer.readDSFIDHeader(dis);
+    int requestId = dis.readInt();
+
+    try {
+      // TODO seems like we don't need this, just set bit that PK is appended
+
+      logger.debug("readEncryptedMessage Reading Request id " + dfsid + " and requestid is " + requestId + " myid " + this.localAddress);
+      InternalDistributedMember pkMbr = null;
+      boolean readPK = false;
+      switch (dfsid) {
+      case FIND_COORDINATOR_REQ:
       case JOIN_REQUEST:
-        if (encrypt == null) {
-          break;
-        }
-        JoinRequestMessage joinMsg = (JoinRequestMessage)m;
-        joinMsg.setPublicKey(encrypt.getPublicKeyBytes());
+        readPK = true;
         break;
-
+      case FIND_COORDINATOR_RESP:
       case JOIN_RESPONSE:
-        JoinResponseMessage jrsp = (JoinResponseMessage)m;
+        // this will have requestId to know the PK
+        pkMbr = getRequestedMember(requestId);
+        break;
+      }
+
+      byte[] data;
+
+      byte[] pk = null;
+
+      if (readPK) {
+        // need to read PK
+        pk = InternalDataSerializer.readByteArray(dis);
+        // encrypt.setPublicKey(publickey, mbr);
+        data = InternalDataSerializer.readByteArray(dis);
+        // using prefixed pk from sender
+        data = encryptLocal.decryptData(data, pk);
+      } else {
+        data = InternalDataSerializer.readByteArray(dis);
+        // from cluster key
+        if (pkMbr != null) {
+          // using member public key
+          data = encryptLocal.decryptData(data, pkMbr);
+        } else {
+          // from cluster key
+          data = encryptLocal.decryptData(data);
+        }
+      }
+
+      {
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+
+        if (ordinal < Version.CURRENT_ORDINAL) {
+          in = new VersionedDataInputStream(in, Version.fromOrdinalNoThrow(ordinal, true));
+        }
+
+        GMSMember m = DataSerializer.readObject(in);
+
+        DistributionMessage result = (DistributionMessage) DataSerializer.readObject(in);
+
+        setSender(result, m, ordinal);
 
-        if (jrsp.getRejectionMessage() == null
+        if (pk != null) {
+          encryptLocal.setPublicKey(pk, result.getSender());
+        }
+
+        return result;
+      }
+    } catch (Exception e) {
+      throw new Exception("Message id is " + dfsid, e);
+    }
+
+  }
+  
+  /** look for certain messages that may need to be altered before being sent */
+  void filterOutgoingMessage(DistributionMessage m) {
+    switch (m.getDSFID()) {
+    case JOIN_RESPONSE:
+      JoinResponseMessage jrsp = (JoinResponseMessage)m;
+      
+      if (jrsp.getRejectionMessage() == null
           &&  services.getConfig().getTransport().isMcastEnabled()) {
-          // get the multicast message digest and pass it with the join response
-          Digest digest = (Digest)this.myChannel.getProtocolStack()
+        // get the multicast message digest and pass it with the join response
+        Digest digest = (Digest)this.myChannel.getProtocolStack()
             .getTopProtocol().down(Event.GET_DIGEST_EVT);
-          HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
-          try {
-            digest.writeTo(hdos);
-          } catch (Exception e) {
-            logger.fatal("Unable to serialize JGroups messaging digest", e);
-          }
-          jrsp.setMessengerData(hdos.toByteArray());
+        HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT);
+        try {
+          digest.writeTo(hdos);
+        } catch (Exception e) {
+          logger.fatal("Unable to serialize JGroups messaging digest", e);
         }
-        break;
-      default:
-        break;
+        jrsp.setMessengerData(hdos.toByteArray());
+      }
+      break;
+    default:
+      break;
     }
   }
   
@@ -1190,7 +1309,7 @@ public class JGroupsMessenger implements Messenger {
           if (clazz.isAssignableFrom(msgClazz)) {
             h = handlers.get(clazz);
             handlers.put(msg.getClass(), h);
-            break;
+            break;              
           }
         }
       }
@@ -1201,6 +1320,72 @@ public class JGroupsMessenger implements Messenger {
     }
   }
   
+  @Override
+  public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) {
+    if (this.encrypt != null) {
+      this.encrypt.installView(alternateView);      
+    }
+    return send(msg, true);
+  }
+
+  @Override
+  public byte[] getPublickey(InternalDistributedMember mbr) {
+    if (encrypt != null) {
+      return encrypt.getPublicKey(mbr);
+    }
+    return null;
+  }
+
+  @Override
+  public void setPublicKey(byte[] publickey, InternalDistributedMember mbr) {
+    if (encrypt != null) {
+      logger.debug("Setting pK for member " + mbr);
+      encrypt.setPublicKey(publickey, mbr);
+    }
+  }
+
+  @Override
+  public void setClusterSecretKey(byte[] clusterSecretKey) {
+    if (encrypt != null) {
+      logger.debug("Setting cluster key");
+      encrypt.addClusterKey(clusterSecretKey);
+    }
+  }
+
+  @Override
+  public byte[] getClusterSecretKey() {
+    if (encrypt != null) {
+      return encrypt.getClusterSecretKey();
+    }
+    return null;
+  }
+
+  private Random randomId = new Random();
+  private HashMap<Integer, InternalDistributedMember> requestIdVsRecipients = new HashMap<>();
   
+  InternalDistributedMember getRequestedMember(int requestId) {
+    //TODO: what if we don't get response, need to remove this otherwise it will be leak
+    return requestIdVsRecipients.remove(requestId);
+  }
+  
+  void addRequestId(int requestId, InternalDistributedMember mbr) {
+    requestIdVsRecipients.put(requestId, mbr);
+  }
   
+  @Override
+  public int getRequestId() {
+    return randomId.nextInt();
+  }
+
+  @Override
+  public void initClusterKey() {
+    if (encrypt != null) {
+      try {
+        logger.debug("Initializing cluster key");
+        encrypt.initClusterSecretKey();
+      } catch (Exception e) {
+        throw new RuntimeException("unable to create cluster key ", e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
index bff592b..4574292 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
@@ -2751,6 +2751,24 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
     }
   }
   
+  public static final int readDSFIDHeader(final DataInput in)
+      throws IOException, ClassNotFoundException
+    {
+      checkIn(in);
+      byte header = in.readByte();
+      if (header == DS_FIXED_ID_BYTE) {
+        return in.readByte();
+      } else if (header == DS_FIXED_ID_SHORT) {
+        return in.readShort();
+      } else if (header == DS_NO_FIXED_ID) {
+        return Integer.MAX_VALUE;//is that correct??
+      } else if (header == DS_FIXED_ID_INT) {
+        return in.readInt();
+      } else {
+        throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid");
+      }
+    }
+  
   /**
    * Reads an instance of <code>String</code> from a
    * <code>DataInput</code> given the header byte already being read.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a73441/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
index 011b8f5..582149c 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
@@ -256,6 +256,7 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
     p.put(DistributionConfig.MCAST_TTL_NAME, mcastttl);
     p.put(DistributionConfig.LOCATORS_NAME, "localhost[" + locatorPort +"]");
     p.put(DistributionConfig.LOG_LEVEL_NAME, "info");
+    p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
     return p;
   } 
   
@@ -288,6 +289,7 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
         locatorProps.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastport);
         locatorProps.setProperty(DistributionConfig.MCAST_TTL_NAME, mcastttl);
         locatorProps.setProperty(DistributionConfig.LOG_LEVEL_NAME, "info");
+        locatorProps.setProperty(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128");
         //locatorProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
         try {
           final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locatorPort, null, null,


Mime
View raw message