geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [25/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Wed, 20 Jan 2016 02:22:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
new file mode 100644
index 0000000..04abc89
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
@@ -0,0 +1,218 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.tcp.ConnectionException;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+
+/**
+ * This class represent a runnable task which exchange the locator information
+ * with local locators(within the site) as well as remote locators (across the
+ * site)
+ * 
+ * @author kbachhav
+ * @since 7.0
+ */
+public class LocatorDiscovery{
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private DistributionLocatorId locatorId;
+  
+  private LocatorMembershipListener locatorListener;
+  
+  RemoteLocatorJoinRequest request;
+
+  public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT = Integer
+      .getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue();
+
+  public static final int WAN_LOCATOR_CONNECTION_INTERVAL = Integer.getInteger(
+      "WANLocator.CONNECTION_INTERVAL", 10000).intValue();
+
+  public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger(
+      "WANLocator.PING_INTERVAL", 10000).intValue();
+
+  public LocatorDiscovery(DistributionLocatorId locotor,RemoteLocatorJoinRequest request,
+      LocatorMembershipListener locatorListener) {
+    this.locatorId = locotor;
+    this.request = request; 
+    this.locatorListener = locatorListener;
+  }
+  
+  /**
+   * When a batch fails, then this keeps the last time when a failure was logged
+   * . We don't want to swamp the logs in retries due to same batch failures.
+   */
+  private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval = new ConcurrentHashMap<DistributionLocatorId, long[]>();
+
+  /**
+   * The maximum size of {@link #failureLogInterval} beyond which it will start
+   * logging all failure instances. Hopefully this should never happen in
+   * practice.
+   */
+  private static final int FAILURE_MAP_MAXSIZE = Integer.getInteger(
+      "gemfire.GatewaySender.FAILURE_MAP_MAXSIZE", 1000000);
+
+  /**
+   * The maximum interval for logging failures of the same event in millis.
+   */
+  private static final int FAILURE_LOG_MAX_INTERVAL = Integer.getInteger(
+      "gemfire.LocatorDiscovery.FAILURE_LOG_MAX_INTERVAL", 300000);
+
+  public final boolean skipFailureLogging(DistributionLocatorId locatorId) {
+    boolean skipLogging = false;
+    if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
+      long[] logInterval = this.failureLogInterval.get(locatorId);
+      if (logInterval == null) {
+        logInterval = this.failureLogInterval.putIfAbsent(locatorId,
+            new long[] { System.currentTimeMillis(), 1000 });
+      }
+      if (logInterval != null) {
+        long currentTime = System.currentTimeMillis();
+        if ((currentTime - logInterval[0]) < logInterval[1]) {
+          skipLogging = true;
+        } else {
+          logInterval[0] = currentTime;
+          if (logInterval[1] <= (FAILURE_LOG_MAX_INTERVAL / 2)) {
+            logInterval[1] *= 2;
+          }
+        }
+      }
+    }
+    return skipLogging;
+  }
+
+
+  public class LocalLocatorDiscovery implements Runnable {
+    public void run() {
+      exchangeLocalLocators();
+    }
+  }
+
+  public class RemoteLocatorDiscovery implements Runnable {
+    public void run() {
+      exchangeRemoteLocators();
+    }
+  }
+  
+  
+  private void exchangeLocalLocators() {
+    int retryAttempt = 1;
+    while (true) {
+      try {
+        RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse)TcpClient
+            .requestToServer(locatorId.getHost(), locatorId.getPort(), request,
+                WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+        if (response != null) {
+          LocatorHelper.addExchnagedLocators(response.getLocators(),
+              this.locatorListener);
+          logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1,
+              new Object[] { request.getLocator(), locatorId }));
+          break;
+        }
+      }
+      catch (IOException ioe) {
+        if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
+          ConnectionException coe = new ConnectionException(
+              "Not able to connect to local locator after "
+              + WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts",
+          ioe);
+          logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2,
+              new Object[] { request.getLocator(),locatorId, retryAttempt }), coe);
+          break;
+        }
+        if (skipFailureLogging(locatorId)) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS,
+              new Object[] { request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL }));
+        }
+        try {
+          Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        retryAttempt++;
+        continue;
+      }
+      catch (ClassNotFoundException cnfe) {
+        logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), cnfe);
+        break;
+      }
+    }
+  }
+  
+  public void exchangeRemoteLocators() {
+    int retryAttempt = 1;
+    DistributionLocatorId remoteLocator = this.locatorId;
+    while (true) {
+      RemoteLocatorJoinResponse response;
+      try {
+        response = (RemoteLocatorJoinResponse)TcpClient
+            .requestToServer(remoteLocator.getHost(), remoteLocator.getPort(),
+                request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+        if (response != null) {
+          LocatorHelper.addExchnagedLocators(response.getLocators(), this.locatorListener);
+          logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1,
+              new Object[] { request.getLocator(), locatorId }));
+          RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest(
+              "");
+          while (true) {
+            Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
+            RemoteLocatorPingResponse pingResponse = (RemoteLocatorPingResponse) TcpClient
+                .requestToServer(remoteLocator.getHost(),
+                    remoteLocator.getPort(), pingRequest,
+                    WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+            if (pingResponse != null) {
+              continue;
+            }
+            break;
+          }
+        }
+      }
+      catch (IOException ioe) {
+        if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
+          logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2,
+              new Object[] { request.getLocator(), remoteLocator, retryAttempt}), ioe);
+          break;
+        }
+        if (skipFailureLogging(remoteLocator)) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS,
+              new Object[] { request.getLocator(), remoteLocator, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL }));
+        }
+        try {
+          Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        retryAttempt++;
+        continue;
+      }
+      catch (ClassNotFoundException cnfe) {
+        logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), cnfe);
+        break;
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java
new file mode 100644
index 0000000..a5b57c8
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java
@@ -0,0 +1,134 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+/**
+ * This is a helper class which helps to add the locator information to the allLocatorInfoMap.
+ * 
+ * @author Kishor Bachhav
+ *
+ */
+public class LocatorHelper {
+  
+  public final static Object locatorObject = new Object();
+  /**
+   * 
+   * This methods add the given locator to allLocatorInfoMap.
+   * It also invokes a locatorlistener to inform other locators in allLocatorInfoMap about this newly added locator.
+   * @param distributedSystemId
+   * @param locator
+   * @param locatorListener
+   * @param sourceLocator
+   */
+  public static boolean addLocator(int distributedSystemId,
+      DistributionLocatorId locator, LocatorMembershipListener locatorListener,
+      DistributionLocatorId sourceLocator) {
+      ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener
+          .getAllLocatorsInfo();
+      Set<DistributionLocatorId> locatorsSet = new CopyOnWriteHashSet<DistributionLocatorId>();
+      locatorsSet.add(locator);
+      Set<DistributionLocatorId> existingValue = allLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet);
+      if(existingValue != null){
+        if (!existingValue.contains(locator)) {
+          existingValue.add(locator);
+          addServerLocator(distributedSystemId, locatorListener, locator);
+          locatorListener.locatorJoined(distributedSystemId, locator,
+              sourceLocator);
+        }
+        else {
+          return false;
+        }
+      }else{
+        addServerLocator(distributedSystemId, locatorListener, locator);
+        locatorListener.locatorJoined(distributedSystemId, locator,
+          sourceLocator);
+      }
+    return true;
+  }
+
+  /**
+   * This methods decides whether the given locator is server locator, if so
+   * then add this locator in allServerLocatorsInfo map.
+   * 
+   * @param distributedSystemId
+   * @param locatorListener
+   * @param locator
+   */
+  private static void addServerLocator(Integer distributedSystemId,
+      LocatorMembershipListener locatorListener, DistributionLocatorId locator) {
+    if (!locator.isServerLocator()) {
+      return;
+    }
+    ConcurrentHashMap<Integer, Set<String>> allServerLocatorsInfo = (ConcurrentHashMap<Integer, Set<String>>)locatorListener
+        .getAllServerLocatorsInfo();
+    
+    Set<String> locatorsSet = new CopyOnWriteHashSet<String>();
+    locatorsSet.add(locator.toString());
+    Set<String> existingValue = allServerLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet);
+    if(existingValue != null){
+      if (!existingValue.contains(locator.toString())) {
+        existingValue.add(locator.toString());
+      }
+    }
+  }
+
+  /**
+   * This method adds the map of locatorsinfo sent by other locator to this locator's allLocatorInfo
+   * 
+   * @param locators
+   * @param locatorListener
+   */
+  public static boolean addExchnagedLocators(Map<Integer, Set<DistributionLocatorId>> locators,
+      LocatorMembershipListener locatorListener) {
+
+    ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocators = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener
+        .getAllLocatorsInfo();
+    if (!allLocators.equals(locators)) {
+      for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators
+          .entrySet()) {
+        Set<DistributionLocatorId> existingValue = allLocators.putIfAbsent(
+            entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>(entry
+                .getValue()));
+
+        if (existingValue != null) {
+          Set<DistributionLocatorId> localLocators = allLocators.get(entry
+              .getKey());
+          if (!localLocators.equals(entry.getValue())) {
+            entry.getValue().removeAll(localLocators);
+            for (DistributionLocatorId locator : entry.getValue()) {
+              localLocators.add(locator);
+              addServerLocator(entry.getKey(), locatorListener, locator);
+              locatorListener.locatorJoined(entry.getKey(), locator, null);
+            }
+          }
+
+        }
+        else {
+          for (DistributionLocatorId locator : entry.getValue()) {
+            addServerLocator(entry.getKey(), locatorListener, locator);
+            locatorListener.locatorJoined(entry.getKey(), locator, null);
+          }
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java
new file mode 100644
index 0000000..124ca2e
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java
@@ -0,0 +1,96 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+
+public class LocatorJoinMessage extends ServerLocationRequest {
+
+  private DistributionLocatorId locator;
+  
+  private int distributedSystemId;
+  
+  private DistributionLocatorId sourceLocator;
+
+  public LocatorJoinMessage() {
+    super();
+  }
+
+  public LocatorJoinMessage(int distributedSystemId, DistributionLocatorId locator,
+      DistributionLocatorId sourceLocator, String serverGroup) {
+    super(serverGroup);
+    this.locator = locator;
+    this.distributedSystemId = distributedSystemId;
+    this.sourceLocator = sourceLocator;
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.locator = DataSerializer.readObject(in);
+    this.distributedSystemId = in.readInt();
+    this.sourceLocator = DataSerializer.readObject(in);
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeObject(locator, out);
+    out.writeInt(this.distributedSystemId);
+    DataSerializer.writeObject(sourceLocator, out);
+  }
+
+  public DistributionLocatorId getLocator() {
+    return this.locator;
+  }
+
+  public int getDistributedSystemId() {
+    return distributedSystemId;
+  }
+  
+  public DistributionLocatorId getSourceLocator() {
+    return sourceLocator;
+  }
+  
+  public int getDSFID() {
+    return DataSerializableFixedID.LOCATOR_JOIN_MESSAGE;
+  }
+
+  @Override
+  public String toString() {
+    return "LocatorJoinMessage{distributedSystemId="+ distributedSystemId +" locators=" + locator + " Source Locator : " + sourceLocator +"}";
+  }
+
+  @Override
+  public boolean equals(Object obj){
+    if ( this == obj ) return true;
+    if ( !(obj instanceof LocatorJoinMessage) ) return false;
+    LocatorJoinMessage myObject = (LocatorJoinMessage)obj;
+    if((this.distributedSystemId == myObject.getDistributedSystemId()) && this.locator.equals(myObject.getLocator())){
+      return true;
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    // it is sufficient for all messages having the same locator to hash to the same bucket
+    if (this.locator == null) {
+      return 0;
+    } else {
+      return this.locator.hashCode();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
new file mode 100644
index 0000000..67049e6
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -0,0 +1,221 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorJoinMessage;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorJoinRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorJoinResponse;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorPingRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorPingResponse;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CommandInitializer;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+
+/**
+ * An implementation of
+ * {@link com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener}
+ * 
+ * @author kbachhav
+ * 
+ */
+public class LocatorMembershipListenerImpl implements LocatorMembershipListener {
+
+  private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>();
+  
+  private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo = new ConcurrentHashMap<Integer, Set<String>>();
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  private DistributionConfig config;
+  
+  private int port;
+  
+  public LocatorMembershipListenerImpl() {
+  }
+  
+  public void setPort(int port){
+    this.port = port;
+  }
+
+  public void setConfig(DistributionConfig config) {
+    this.config = config;
+  }
+  
+  /**
+   * When the new locator is added to remote locator metadata, inform all other
+   * locators in remote locator metadata about the new locator so that they can
+   * update their remote locator metadata.
+   * 
+   * @param locator
+   */
+  
+  public void locatorJoined(final int distributedSystemId,
+      final DistributionLocatorId locator,
+      final DistributionLocatorId sourceLocator) {
+    Thread distributeLocator = new Thread(new Runnable() {
+      public void run() {
+        ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = getAllLocatorsInfo();
+        ArrayList<DistributionLocatorId> locatorsToRemove = new ArrayList<DistributionLocatorId>();
+        
+        String localLocator = config.getStartLocator();
+        DistributionLocatorId localLocatorId = null;
+        if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+          localLocatorId = new DistributionLocatorId(port, config
+              .getBindAddress());
+        }
+        else {
+          localLocatorId = new DistributionLocatorId(localLocator);
+        }
+        locatorsToRemove.add(localLocatorId);
+        locatorsToRemove.add(locator);
+        locatorsToRemove.add(sourceLocator);
+        
+        Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<Integer, Set<DistributionLocatorId>>();
+        for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()){
+          Set<DistributionLocatorId> value = new CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue());
+          localCopy.put(entry.getKey(), value);
+        }  
+        for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : localCopy.entrySet()){
+          for(DistributionLocatorId removeLocId : locatorsToRemove){
+            if(entry.getValue().contains(removeLocId)){
+              entry.getValue().remove(removeLocId);
+            }
+          }
+          for (DistributionLocatorId value : entry.getValue()) {
+            try {
+              TcpClient.requestToServer(value.getHost(), value.getPort(),
+                  new LocatorJoinMessage(distributedSystemId, locator, localLocatorId, ""), 1000, false);
+            }
+            catch (Exception e) {
+              if (logger.isDebugEnabled()) {
+                logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, 
+                    new Object[] { locator.getHost(), locator.getPort(), value.getHost(), value.getPort() }));
+              }
+            }
+            try {
+              TcpClient.requestToServer(locator.getHost(), locator.getPort(),
+                  new LocatorJoinMessage(entry.getKey(), value, localLocatorId, ""), 1000, false);
+            }
+            catch (Exception e) {
+              if (logger.isDebugEnabled()) {
+                logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3,
+                    new Object[] { value.getHost(), value.getPort(), locator.getHost(), locator.getPort() }));
+              }
+            }
+          }
+        }
+      }
+    });
+    distributeLocator.setDaemon(true);
+    distributeLocator.start();
+  }
+
+  public Object handleRequest(Object request) {
+    Object response = null;
+    if (request instanceof RemoteLocatorJoinRequest) {
+      response = updateAllLocatorInfo((RemoteLocatorJoinRequest)request);
+    }
+    else if (request instanceof LocatorJoinMessage) {
+      response = informAboutRemoteLocators((LocatorJoinMessage)request);
+    }
+    else if (request instanceof RemoteLocatorPingRequest) {
+      response = getPingResponse((RemoteLocatorPingRequest)request);
+    }
+    else if (request instanceof RemoteLocatorRequest) {
+      response = getRemoteLocators((RemoteLocatorRequest)request);
+    }
+    return response;
+  }
+  
+  /**
+   * A locator from the request is checked against the existing remote locator
+   * metadata. If it is not available then added to existing remote locator
+   * metadata and LocatorMembershipListener is invoked to inform about the
+   * this newly added locator to all other locators available in remote locator
+   * metadata. As a response, remote locator metadata is sent.
+   * 
+   * @param request
+   */
+  private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest request) {
+    int distributedSystemId = request.getDistributedSystemId();
+    DistributionLocatorId locator = request.getLocator();
+
+    LocatorHelper.addLocator(distributedSystemId, locator, this, null);
+    return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo());
+  }
+  
+  private Object getPingResponse(RemoteLocatorPingRequest request) {
+   return new RemoteLocatorPingResponse();
+  }
+  
+  private Object informAboutRemoteLocators(LocatorJoinMessage request){
+    // TODO: FInd out the importance of list locatorJoinMessages. During
+    // refactoring I could not understand its significance
+//    synchronized (locatorJoinObject) {
+//      if (locatorJoinMessages.contains(request)) {
+//        return null;
+//      }
+//      locatorJoinMessages.add(request);  
+//    }
+    int distributedSystemId = request.getDistributedSystemId();
+    DistributionLocatorId locator = request.getLocator();
+    DistributionLocatorId sourceLocatorId = request.getSourceLocator();
+
+    LocatorHelper.addLocator(distributedSystemId, locator, this, sourceLocatorId);
+    return null;
+  }
+  
+  private Object getRemoteLocators(RemoteLocatorRequest request) {
+    int dsId = request.getDsId();
+    Set<String> locators = this.getRemoteLocatorInfo(dsId);
+    return new RemoteLocatorResponse(locators);
+  }
+  
+  public Set<String> getRemoteLocatorInfo(int dsId) {
+    return this.allServerLocatorsInfo.get(dsId);
+  }
+
+  public ConcurrentMap<Integer,Set<DistributionLocatorId>> getAllLocatorsInfo() {
+    return this.allLocatorsInfo;
+  }
+  
+  public ConcurrentMap<Integer,Set<String>> getAllServerLocatorsInfo() {
+    return this.allServerLocatorsInfo;
+  }
+  
+  public void clearLocatorInfo(){
+    allLocatorsInfo.clear();
+    allServerLocatorsInfo.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
new file mode 100644
index 0000000..b31a6bd
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
@@ -0,0 +1,78 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+
+/**
+ * Requests remote locators of a remote WAN site
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @author Kishor Bachhav
+ * 
+ * @since 6.6
+ * 
+ */
+public class RemoteLocatorJoinRequest implements DataSerializableFixedID {
+
+  private DistributionLocatorId locator = null;
+ 
+  private int distributedSystemId = -1;
+
+  public RemoteLocatorJoinRequest() {
+    super();
+  }
+
+  public RemoteLocatorJoinRequest(int distributedSystemId, DistributionLocatorId locator,
+      String serverGroup) {
+    this.distributedSystemId = distributedSystemId;
+    this.locator = locator;
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.locator = DataSerializer.readObject(in);
+    this.distributedSystemId = in.readInt();
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeObject(locator, out);
+    out.writeInt(this.distributedSystemId);
+  }
+
+  public DistributionLocatorId getLocator() {
+    return this.locator;
+  }
+  
+  public int getDistributedSystemId() {
+    return distributedSystemId;
+  }
+  
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteLocatorJoinRequest{locator=" + locator + "}";
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
new file mode 100644
index 0000000..4bd478d
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
@@ -0,0 +1,80 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+
+/**
+ * List of remote locators as a response
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @author Kishor Bachhav
+ * 
+ * 
+ */
+public class RemoteLocatorJoinResponse implements DataSerializableFixedID{
+
+  private HashMap<Integer, Set<DistributionLocatorId>> locators = new HashMap<Integer, Set<DistributionLocatorId>>();
+  
+  /** Used by DataSerializer */
+  public RemoteLocatorJoinResponse() {
+    super();
+  }
+
+  public RemoteLocatorJoinResponse(
+      Map<Integer, Set<DistributionLocatorId>> locators) {
+    super();
+    this.locators = new HashMap<Integer, Set<DistributionLocatorId>>();
+    for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators
+        .entrySet()) {
+      this.locators.put(entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>(
+          entry.getValue()));
+    }
+  }
+  
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.locators = DataSerializer.readHashMap(in);
+    
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashMap(locators, out);
+  }
+
+  public Map<Integer, Set<DistributionLocatorId>> getLocators() {
+    return this.locators;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteLocatorJoinResponse{locators=" + locators + "}";
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
new file mode 100644
index 0000000..937b676
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
@@ -0,0 +1,47 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * 
+ * @author Kishor Bachhav
+ *
+ */
+
+public class RemoteLocatorPingRequest implements DataSerializableFixedID{
+
+  public RemoteLocatorPingRequest() {
+    super();
+  }
+
+  public RemoteLocatorPingRequest(String serverGroup) {
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+  }
+
+  public void toData(DataOutput out) throws IOException {
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
new file mode 100644
index 0000000..3978e81
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
@@ -0,0 +1,46 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * 
+ * @author Kishor Bachhav
+ */
+public class RemoteLocatorPingResponse implements DataSerializableFixedID {
+
+
+  /** Used by DataSerializer */
+  public RemoteLocatorPingResponse() {
+    super();
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+  }
+
+  public void toData(DataOutput out) throws IOException {
+  }
+
+
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java
new file mode 100644
index 0000000..8187531
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java
@@ -0,0 +1,57 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+/**
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @author Kishor Bachhav
+ *
+ */
+public class RemoteLocatorRequest implements DataSerializableFixedID{
+  private int distributedSystemId ;
+
+  public RemoteLocatorRequest() {
+    super();
+  }
+  public RemoteLocatorRequest(int dsId, String serverGroup) {
+    this.distributedSystemId = dsId;
+  }
+  
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.distributedSystemId = in.readInt();
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(this.distributedSystemId);
+  }
+
+  public int getDsId() {
+    return this.distributedSystemId;
+  }
+  
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_REQUEST;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteLocatorRequest{dsName=" + distributedSystemId + "}";
+  }
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java
new file mode 100644
index 0000000..5c61bdf
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java
@@ -0,0 +1,65 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @author Kishor Bachhav
+ *
+ */
+public class RemoteLocatorResponse implements DataSerializableFixedID{
+
+  private Set<String> locators ;
+
+  /** Used by DataSerializer */
+  public RemoteLocatorResponse() {
+    super();
+  }
+  
+  public RemoteLocatorResponse(Set<String> locators) {
+    this.locators = locators;
+  }
+  
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.locators = DataSerializer.readObject(in);
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeObject(this.locators, out);
+  }
+
+  public Set<String> getLocators() {
+    return this.locators;
+  }
+  
+  @Override
+  public String toString() {
+    return "RemoteLocatorResponse{locators=" + locators +"}";
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+     return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java
new file mode 100644
index 0000000..a44a449
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java
@@ -0,0 +1,58 @@
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverFactoryImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderFactoryImpl;
+import com.gemstone.gemfire.internal.cache.wan.spi.WANFactory;
+
+public class WANFactoryImpl implements WANFactory {
+  
+  @Override
+  public void initialize() {
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST,
+        RemoteLocatorJoinRequest.class);
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE,
+        RemoteLocatorJoinResponse.class);
+    DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_REQUEST,
+        RemoteLocatorRequest.class);
+    DSFIDFactory.registerDSFID(DataSerializableFixedID.LOCATOR_JOIN_MESSAGE,
+        LocatorJoinMessage.class);
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST,
+        RemoteLocatorPingRequest.class);
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE,
+        RemoteLocatorPingResponse.class);
+    DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE,
+        RemoteLocatorResponse.class);
+  }
+
+  @Override
+  public GatewaySenderFactory createGatewaySenderFactory(Cache cache) {
+    return new GatewaySenderFactoryImpl(cache);
+  }
+
+  @Override
+  public GatewayReceiverFactory createGatewayReceiverFactory(Cache cache) {
+    return new GatewayReceiverFactoryImpl(cache);
+  }
+
+  @Override
+  public WanLocatorDiscoverer createLocatorDiscoverer() {
+    return new WanLocatorDiscovererImpl();
+  }
+
+  @Override
+  public LocatorMembershipListener createLocatorMembershipListener() {
+    return new LocatorMembershipListenerImpl();
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
new file mode 100644
index 0000000..e6ef6e0
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
@@ -0,0 +1,129 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorDiscovery;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorJoinRequest;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+
+public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private ExecutorService _executor;
+  
+  public WanLocatorDiscovererImpl() {
+    
+  }
+  
+  @Override
+  public void discover(int port, DistributionConfigImpl config, LocatorMembershipListener locatorListener) {
+    final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
+        .createThreadGroup("WAN Locator Discovery Logger Group", logger);
+
+    final ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable task) {
+        final Thread thread = new Thread(loggingThreadGroup, task,
+            "WAN Locator Discovery Thread");
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
+
+    this._executor = Executors.newCachedThreadPool(threadFactory);
+    exchangeLocalLocators(port, config, locatorListener);
+    exchangeRemoteLocators(port, config, locatorListener);
+    this._executor.shutdown();
+  }
+  
+  
+  /**
+   * For WAN 70 Exchange the locator information within the distributed system
+   * 
+   * @param config
+   */
+  private void exchangeLocalLocators(int port, DistributionConfigImpl config, LocatorMembershipListener locatorListener) {
+    String localLocator = config.getStartLocator();
+    DistributionLocatorId locatorId = null;
+    if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+      locatorId = new DistributionLocatorId(port, config.getBindAddress());
+    }
+    else {
+      locatorId = new DistributionLocatorId(localLocator);
+    }
+    LocatorHelper.addLocator(config.getDistributedSystemId(), locatorId, locatorListener, null);
+
+    RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config);
+    StringTokenizer locatorsOnThisVM = new StringTokenizer(
+        config.getLocators(), ",");
+    while (locatorsOnThisVM.hasMoreTokens()) {
+      DistributionLocatorId localLocatorId = new DistributionLocatorId(
+          locatorsOnThisVM.nextToken());
+      if (!locatorId.equals(localLocatorId)) {
+        LocatorDiscovery localDiscovery = new LocatorDiscovery(localLocatorId, request, locatorListener);
+        LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = localDiscovery.new LocalLocatorDiscovery();
+        this._executor.execute(localLocatorDiscovery);
+      }
+    }
+  }
+  
+  /**
+   * For WAN 70 Exchange the locator information across the distributed systems
+   * (sites)
+   * 
+   * @param config
+   */
+  private void exchangeRemoteLocators(int port, DistributionConfigImpl config, LocatorMembershipListener locatorListener) {
+    RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config);
+    String remoteDustributedSystems = config.getRemoteLocators();
+    if (remoteDustributedSystems.length() > 0) {
+      StringTokenizer remoteLocators = new StringTokenizer(
+          remoteDustributedSystems, ",");
+      while (remoteLocators.hasMoreTokens()) {
+        DistributionLocatorId remoteLocatorId = new DistributionLocatorId(
+            remoteLocators.nextToken());
+        LocatorDiscovery localDiscovery = new LocatorDiscovery(remoteLocatorId,
+            request, locatorListener);
+        LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = localDiscovery.new RemoteLocatorDiscovery();
+        this._executor.execute(remoteLocatorDiscovery);
+      }
+    }
+  }
+  
+  private RemoteLocatorJoinRequest buildRemoteDSJoinRequest(int port,
+      DistributionConfigImpl config) {
+    String localLocator = config.getStartLocator();
+    DistributionLocatorId locatorId = null;
+    if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+      locatorId = new DistributionLocatorId(port, config.getBindAddress());
+    }
+    else {
+      locatorId = new DistributionLocatorId(localLocator);
+    }
+    RemoteLocatorJoinRequest request = new RemoteLocatorJoinRequest(
+        config.getDistributedSystemId(), locatorId, "");
+    return request;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java
new file mode 100644
index 0000000..e9a0451
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java
@@ -0,0 +1,153 @@
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+
+public abstract class AbstractRemoteGatewaySender extends AbstractGatewaySender {
+  private static final Logger logger = LogService.getLogger();
+  
+  public AbstractRemoteGatewaySender() {
+    
+  }
+  public AbstractRemoteGatewaySender(Cache cache, GatewaySenderAttributes attrs){
+    super(cache, attrs);
+  }
+  
+  /** used to reduce warning logs in case remote locator is down (#47634) */ 
+  protected int proxyFailureTries = 0; 
+  
+  public synchronized void initProxy() {
+    // return if it is being used for WBCL or proxy is already created
+    if (this.remoteDSId == DEFAULT_DISTRIBUTED_SYSTEM_ID || this.proxy != null
+        && !this.proxy.isDestroyed()) {
+      return;
+    }
+
+    int locatorCount = 0;
+    PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
+    pf.setPRSingleHopEnabled(false);
+    if (this.locatorDiscoveryCallback != null) {
+      pf.setLocatorDiscoveryCallback(locatorDiscoveryCallback);
+    }
+    pf.setReadTimeout(this.socketReadTimeout);
+    pf.setIdleTimeout(connectionIdleTimeOut);
+    pf.setSocketBufferSize(socketBufferSize);
+    pf.setServerGroup(GatewayReceiver.RECEIVER_GROUP);
+    RemoteLocatorRequest request = new RemoteLocatorRequest(this.remoteDSId, pf
+        .getPoolAttributes().getServerGroup());
+    String locators = ((GemFireCacheImpl) this.cache).getDistributedSystem()
+        .getConfig().getLocators();
+    if (logger.isDebugEnabled()) {
+      logger.debug("Gateway Sender is attempting to configure pool with remote locator information");
+    }
+    StringTokenizer locatorsOnThisVM = new StringTokenizer(locators, ",");
+    while (locatorsOnThisVM.hasMoreTokens()) {
+      String localLocator = locatorsOnThisVM.nextToken();
+      DistributionLocatorId locatorID = new DistributionLocatorId(localLocator);
+      try {
+        RemoteLocatorResponse response = (RemoteLocatorResponse) TcpClient
+            .requestToServer(locatorID.getHost(), locatorID.getPort(), request,
+                WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+
+        if (response != null) {
+          if (response.getLocators() == null) {
+            if (logProxyFailure()) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AbstractGatewaySender_REMOTE_LOCATOR_FOR_REMOTE_SITE_0_IS_NOT_AVAILABLE_IN_LOCAL_LOCATOR_1,
+                      new Object[] { remoteDSId, localLocator }));
+            }
+            continue;
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("Received the remote site {} location information:", this.remoteDSId, response.getLocators());
+          }
+          StringBuffer strBuffer = new StringBuffer();
+          Iterator<String> itr = response.getLocators().iterator();
+          while (itr.hasNext()) {
+            DistributionLocatorId locatorId = new DistributionLocatorId(itr.next());
+            pf.addLocator(locatorId.getHost().getHostName(), locatorId.getPort());
+            locatorCount++;
+          }
+          break;
+        }
+      } catch (IOException ioe) {
+        if (logProxyFailure()) {
+          // don't print stack trace for connection failures
+          String ioeStr = "";
+          if (!logger.isDebugEnabled() && ioe instanceof ConnectException) {
+            ioeStr = ": " + ioe.toString();
+            ioe = null;
+          }
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1,
+                new Object[] { this.id, localLocator + ioeStr  }), ioe);        
+        }
+        continue;
+      } catch (ClassNotFoundException e) {
+        if (logProxyFailure()) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1,
+                  new Object[] { this.id, localLocator }), e);
+        }
+        continue;
+      }
+    }
+
+    if (locatorCount == 0) {
+      if (logProxyFailure()) {
+        logger.fatal(LocalizedMessage.create(
+            LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1,
+                new Object[] { this.id, this.remoteDSId }));
+      }
+      this.proxyFailureTries++;
+      throw new GatewaySenderConfigurationException(
+          LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1
+              .toLocalizedString(new Object[] { this.id, this.remoteDSId}));
+    }
+    pf.init(this);
+    this.proxy = ((PoolImpl) pf.create(this.getId()));
+    if (this.proxyFailureTries > 0) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.AbstractGatewaySender_SENDER_0_GOT_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1,
+              new Object[] { this.id, this.remoteDSId, this.proxyFailureTries }));
+      this.proxyFailureTries = 0;
+    }
+  }
+  
+  protected boolean logProxyFailure() {
+    assert Thread.holdsLock(this);
+    // always log the first failure
+    if (logger.isDebugEnabled() || this.proxyFailureTries == 0) {
+      return true;
+    } else {
+      // subsequent failures will be logged on 30th, 300th, 3000th try
+      // each try is at 100millis from higher layer so this accounts for logging
+      // after 3s, 30s and then every 5mins
+      if (this.proxyFailureTries >= 3000) {
+        return (this.proxyFailureTries % 3000) == 0;
+      } else {
+        return (this.proxyFailureTries == 30 || this.proxyFailureTries == 300);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
new file mode 100644
index 0000000..f3e03a8
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
@@ -0,0 +1,140 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * ========================================================================
+ */
+
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * 
+ * @since 7.0
+ */
+public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
+
+  private int startPort = GatewayReceiver.DEFAULT_START_PORT;
+  
+  private int endPort = GatewayReceiver.DEFAULT_END_PORT;
+  
+  private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
+
+  private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE;
+
+  private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS; 
+  
+  private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS;  
+  
+  private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START;
+
+  private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>();
+  
+  private Cache cache;
+
+  public GatewayReceiverFactoryImpl() {
+    
+  }
+  public GatewayReceiverFactoryImpl(Cache cache) {
+   this.cache = cache;
+  }
+  
+  public GatewayReceiverFactory addGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.filters.add(filter);
+    return this;
+  }
+
+  public GatewayReceiverFactory removeGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.filters.remove(filter);
+    return this;
+  }
+
+  public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) {
+    this.timeBetPings = time;
+    return this;
+  }
+
+  public GatewayReceiverFactory setStartPort(int port) {
+    this.startPort = port;
+    return this;
+  }
+  
+  public GatewayReceiverFactory setEndPort(int port) {
+    this.endPort = port;
+    return this;
+  }
+  
+  public GatewayReceiverFactory setSocketBufferSize(int size) {
+    this.socketBuffSize = size;
+    return this;
+  }
+
+  public GatewayReceiverFactory setBindAddress(String address) {
+    this.bindAdd = address;
+    return this;
+  }
+  
+  public GatewayReceiverFactory setHostnameForSenders(String address) {
+    this.hostnameForSenders = address;
+    return this;
+  } 
+
+  public GatewayReceiverFactory setManualStart(boolean start) {
+    this.manualStart = start;
+    return this;
+  }
+  
+  public GatewayReceiver create() {
+    if (this.startPort > this.endPort) {
+      throw new IllegalStateException(
+          "Please specify either start port a value which is less than end port.");
+    }
+    GatewayReceiver recv = null;
+    if (this.cache instanceof GemFireCacheImpl) {
+      recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort,
+          this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
+          this.hostnameForSenders, this.manualStart);
+      ((GemFireCacheImpl)cache).addGatewayReceiver(recv);
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+      .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv);
+      if (!this.manualStart) {
+        try {
+          recv.start();
+        }
+        catch (IOException ioe) {
+          throw new GatewayReceiverException(
+              LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER
+                  .toLocalizedString(), ioe);
+        }
+      }
+    } else if (this.cache instanceof CacheCreation) {
+      recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort,
+          this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
+          this.hostnameForSenders, this.manualStart);
+      ((CacheCreation)cache).addGatewayReceiver(recv);
+    }
+    return recv;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
new file mode 100644
index 0000000..0a14e2e
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
@@ -0,0 +1,245 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * ========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * 
+ * @since 7.0
+ */
+public class GatewayReceiverImpl implements GatewayReceiver {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private String host;
+
+  private int startPort;
+  
+  private int endPort;
+  
+  private int port;
+
+  private int timeBetPings;
+
+  private int socketBufferSize;
+  
+  private boolean manualStart;
+
+  private final List<GatewayTransportFilter> filters;
+
+  private String bindAdd;
+  
+  private CacheServer receiver;
+
+  private final GemFireCacheImpl cache;
+  
+  public GatewayReceiverImpl(Cache cache, int startPort,
+      int endPort, int timeBetPings, int buffSize, String bindAdd,
+      List<GatewayTransportFilter> filters, String hostnameForSenders, boolean manualStart) {
+    this.cache = (GemFireCacheImpl)cache;
+    
+    /*
+     * If user has set hostNameForSenders then it should take precedence over
+     * bindAddress. If user hasn't set either hostNameForSenders or bindAddress
+     * then getLocalHost().getHostName() should be used.
+     */
+    if (hostnameForSenders == null || hostnameForSenders.isEmpty()) {
+      if (bindAdd == null || bindAdd.isEmpty()) {
+        try {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST));
+          this.host = SocketCreator.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+          throw new IllegalStateException(
+              LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME
+                  .toLocalizedString(),
+              e);
+        }
+      } else {
+        this.host = bindAdd;
+      }
+    } else {
+      this.host = hostnameForSenders;
+    }
+
+    this.startPort = startPort;
+    this.endPort = endPort;
+    this.timeBetPings = timeBetPings;
+    this.socketBufferSize = buffSize;
+    this.bindAdd = bindAdd;
+    this.filters = filters;
+    this.manualStart = manualStart;
+  }
+
+  public List<GatewayTransportFilter> getGatewayTransportFilters() {
+    return this.filters;
+  }
+
+  public int getMaximumTimeBetweenPings() {
+    return this.timeBetPings;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public int getStartPort() {
+    return this.startPort;
+  }
+  
+  public int getEndPort() {
+    return this.endPort;
+  }
+  
+  public int getSocketBufferSize() {
+    return this.socketBufferSize;
+  }
+
+  public boolean isManualStart() {
+    return this.manualStart;
+  }
+  
+  public CacheServer getServer() {
+    return receiver;
+  }
+  
+  public void start() throws IOException {
+    if (receiver == null) {
+      receiver = this.cache.addCacheServer(true);
+    }
+    if (receiver.isRunning()) {
+      logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING));
+      return;
+    }
+    boolean started = false;
+    this.port = getPortToStart();
+    while (!started && this.port != -1) {
+      receiver.setPort(this.port);
+      receiver.setSocketBufferSize(socketBufferSize);
+      receiver.setMaximumTimeBetweenPings(timeBetPings);
+      receiver.setHostnameForClients(host);
+      receiver.setBindAddress(bindAdd);
+      receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP });
+      ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters);
+      try {
+        ((CacheServerImpl)receiver).start();
+        started = true;
+      } catch (BindException be) {
+        if (be.getCause() != null
+            && be.getCause().getMessage()
+                .contains("assign requested address")) {
+          throw new GatewayReceiverException(
+              LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
+                  .toLocalizedString(new Object[] { bindAdd,
+                      Integer.valueOf(this.port) }));
+        }
+        // ignore as this port might have been used by other threads.
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
+        this.port = getPortToStart();
+      } catch (SocketException se) {
+        if (se.getMessage().contains("Address already in use")) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
+          this.port = getPortToStart();
+
+        } else {
+          throw se;
+        }
+      }
+      
+    }
+    if (!started) {
+      throw new IllegalStateException(
+          "No available free port found in the given range.");
+    }
+    logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port));
+
+    InternalDistributedSystem system = this.cache.getDistributedSystem();
+    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
+
+  }
+  
+  private int getPortToStart(){
+    // choose a random port from the given port range
+    int rPort;
+    if (this.startPort == this.endPort) {
+      rPort = this.startPort;
+    } else {
+      rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort,
+          this.endPort, AvailablePort.SOCKET);
+    }
+    return rPort;
+  }
+  
+  public void stop() {
+    if(!isRunning()){
+      throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString());
+    }
+    receiver.stop();
+
+//    InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+//        .getDistributedSystem();
+//    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this);
+
+  }
+
+  public String getHost() {
+    return this.host;
+  }
+
+  public String getBindAddress() {
+    return this.bindAdd;
+  }
+
+  public boolean isRunning() {
+    if (this.receiver != null) {
+      return this.receiver.isRunning();
+    }
+    return false;
+  }
+  
+  public String toString() {
+    return new StringBuffer()
+      .append("Gateway Receiver")
+      .append("@").append(Integer.toHexString(hashCode()))
+      .append(" [")
+      .append("host='").append(getHost())
+      .append("'; port=").append(getPort())
+      .append("; bindAddress=").append(getBindAddress())
+      .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings())
+      .append("; socketBufferSize=").append(getSocketBufferSize())
+      .append("; isManualStart=").append(isManualStart())
+      .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP}))
+      .append("]")
+      .toString();
+  }
+   
+}


Mime
View raw message