geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject [58/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/main/java/org/apache/geode)
Date Tue, 13 Sep 2016 22:44:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java
new file mode 100644
index 0000000..163a611
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+public abstract class AbstractRemoteGatewaySender extends AbstractGatewaySender {
+  private static final Logger logger = LogService.getLogger();
+  
+  public AbstractRemoteGatewaySender() {
+    
+  }
+  public AbstractRemoteGatewaySender(Cache cache, GatewaySenderAttributes attrs){
+    super(cache, attrs);
+  }
+  
+  /** used to reduce warning logs in case remote locator is down (#47634) */ 
+  protected int proxyFailureTries = 0; 
+  
+  public synchronized void initProxy() {
+    // return if it is being used for WBCL or proxy is already created
+    if (this.remoteDSId == DEFAULT_DISTRIBUTED_SYSTEM_ID || this.proxy != null
+        && !this.proxy.isDestroyed()) {
+      return;
+    }
+
+    int locatorCount = 0;
+    PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
+    pf.setPRSingleHopEnabled(false);
+    if (this.locatorDiscoveryCallback != null) {
+      pf.setLocatorDiscoveryCallback(locatorDiscoveryCallback);
+    }
+    pf.setReadTimeout(this.socketReadTimeout);
+    pf.setIdleTimeout(connectionIdleTimeOut);
+    pf.setSocketBufferSize(socketBufferSize);
+    pf.setServerGroup(GatewayReceiver.RECEIVER_GROUP);
+    RemoteLocatorRequest request = new RemoteLocatorRequest(this.remoteDSId, pf
+        .getPoolAttributes().getServerGroup());
+    String locators = ((GemFireCacheImpl) this.cache).getDistributedSystem()
+        .getConfig().getLocators();
+    if (logger.isDebugEnabled()) {
+      logger.debug("Gateway Sender is attempting to configure pool with remote locator information");
+    }
+    StringTokenizer locatorsOnThisVM = new StringTokenizer(locators, ",");
+    while (locatorsOnThisVM.hasMoreTokens()) {
+      String localLocator = locatorsOnThisVM.nextToken();
+      DistributionLocatorId locatorID = new DistributionLocatorId(localLocator);
+      try {
+        RemoteLocatorResponse response = (RemoteLocatorResponse) new TcpClient()
+            .requestToServer(locatorID.getHost(), locatorID.getPort(), request,
+                WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+
+        if (response != null) {
+          if (response.getLocators() == null) {
+            if (logProxyFailure()) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AbstractGatewaySender_REMOTE_LOCATOR_FOR_REMOTE_SITE_0_IS_NOT_AVAILABLE_IN_LOCAL_LOCATOR_1,
+                      new Object[] { remoteDSId, localLocator }));
+            }
+            continue;
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("Received the remote site {} location information:", this.remoteDSId, response.getLocators());
+          }
+          StringBuffer strBuffer = new StringBuffer();
+          Iterator<String> itr = response.getLocators().iterator();
+          while (itr.hasNext()) {
+            DistributionLocatorId locatorId = new DistributionLocatorId(itr.next());
+            pf.addLocator(locatorId.getHost().getHostName(), locatorId.getPort());
+            locatorCount++;
+          }
+          break;
+        }
+      } catch (IOException ioe) {
+        if (logProxyFailure()) {
+          // don't print stack trace for connection failures
+          String ioeStr = "";
+          if (!logger.isDebugEnabled() && ioe instanceof ConnectException) {
+            ioeStr = ": " + ioe.toString();
+            ioe = null;
+          }
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1,
+                new Object[] { this.id, localLocator + ioeStr  }), ioe);        
+        }
+        continue;
+      } catch (ClassNotFoundException e) {
+        if (logProxyFailure()) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1,
+                  new Object[] { this.id, localLocator }), e);
+        }
+        continue;
+      }
+    }
+
+    if (locatorCount == 0) {
+      if (logProxyFailure()) {
+        logger.fatal(LocalizedMessage.create(
+            LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1,
+                new Object[] { this.id, this.remoteDSId }));
+      }
+      this.proxyFailureTries++;
+      throw new GatewaySenderConfigurationException(
+          LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1
+              .toLocalizedString(new Object[] { this.id, this.remoteDSId}));
+    }
+    pf.init(this);
+    this.proxy = ((PoolImpl) pf.create(this.getId()));
+    if (this.proxyFailureTries > 0) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.AbstractGatewaySender_SENDER_0_GOT_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1,
+              new Object[] { this.id, this.remoteDSId, this.proxyFailureTries }));
+      this.proxyFailureTries = 0;
+    }
+  }
+  
+  protected boolean logProxyFailure() {
+    assert Thread.holdsLock(this);
+    // always log the first failure
+    if (logger.isDebugEnabled() || this.proxyFailureTries == 0) {
+      return true;
+    } else {
+      // subsequent failures will be logged on 30th, 300th, 3000th try
+      // each try is at 100millis from higher layer so this accounts for logging
+      // after 3s, 30s and then every 5mins
+      if (this.proxyFailureTries >= 3000) {
+        return (this.proxyFailureTries % 3000) == 0;
+      } else {
+        return (this.proxyFailureTries == 30 || this.proxyFailureTries == 300);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
new file mode 100644
index 0000000..c5c2a2f
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * 
+ * @since GemFire 7.0
+ */
+public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
+
+  private int startPort = GatewayReceiver.DEFAULT_START_PORT;
+  
+  private int endPort = GatewayReceiver.DEFAULT_END_PORT;
+  
+  private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
+
+  private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE;
+
+  private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS; 
+  
+  private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS;  
+  
+  private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START;
+
+  private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>();
+  
+  private Cache cache;
+
+  public GatewayReceiverFactoryImpl() {
+    
+  }
+  public GatewayReceiverFactoryImpl(Cache cache) {
+   this.cache = cache;
+  }
+  
+  public GatewayReceiverFactory addGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.filters.add(filter);
+    return this;
+  }
+
+  public GatewayReceiverFactory removeGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.filters.remove(filter);
+    return this;
+  }
+
+  public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) {
+    this.timeBetPings = time;
+    return this;
+  }
+
+  public GatewayReceiverFactory setStartPort(int port) {
+    this.startPort = port;
+    return this;
+  }
+  
+  public GatewayReceiverFactory setEndPort(int port) {
+    this.endPort = port;
+    return this;
+  }
+  
+  public GatewayReceiverFactory setSocketBufferSize(int size) {
+    this.socketBuffSize = size;
+    return this;
+  }
+
+  public GatewayReceiverFactory setBindAddress(String address) {
+    this.bindAdd = address;
+    return this;
+  }
+  
+  public GatewayReceiverFactory setHostnameForSenders(String address) {
+    this.hostnameForSenders = address;
+    return this;
+  } 
+
+  public GatewayReceiverFactory setManualStart(boolean start) {
+    this.manualStart = start;
+    return this;
+  }
+  
+  public GatewayReceiver create() {
+    if (this.startPort > this.endPort) {
+      throw new IllegalStateException(
+          "Please specify either start port a value which is less than end port.");
+    }
+    GatewayReceiver recv = null;
+    if (this.cache instanceof GemFireCacheImpl) {
+      recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort,
+          this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
+          this.hostnameForSenders, this.manualStart);
+      ((GemFireCacheImpl)cache).addGatewayReceiver(recv);
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+      .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv);
+      if (!this.manualStart) {
+        try {
+          recv.start();
+        }
+        catch (IOException ioe) {
+          throw new GatewayReceiverException(
+              LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER
+                  .toLocalizedString(), ioe);
+        }
+      }
+    } else if (this.cache instanceof CacheCreation) {
+      recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort,
+          this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
+          this.hostnameForSenders, this.manualStart);
+      ((CacheCreation)cache).addGatewayReceiver(recv);
+    }
+    return recv;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
new file mode 100644
index 0000000..b8768d4
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.net.SocketCreator;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @since GemFire 7.0
+ */
+@SuppressWarnings("deprecation")
+public class GatewayReceiverImpl implements GatewayReceiver {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private String host;
+
+  private int startPort;
+  
+  private int endPort;
+  
+  private int port;
+
+  private int timeBetPings;
+
+  private int socketBufferSize;
+  
+  private boolean manualStart;
+
+  private final List<GatewayTransportFilter> filters;
+
+  private String bindAdd;
+  
+  private CacheServer receiver;
+
+  private final GemFireCacheImpl cache;
+  
+  public GatewayReceiverImpl(Cache cache, int startPort,
+      int endPort, int timeBetPings, int buffSize, String bindAdd,
+      List<GatewayTransportFilter> filters, String hostnameForSenders, boolean manualStart) {
+    this.cache = (GemFireCacheImpl)cache;
+    
+    /*
+     * If user has set hostNameForSenders then it should take precedence over
+     * bindAddress. If user hasn't set either hostNameForSenders or bindAddress
+     * then getLocalHost().getHostName() should be used.
+     */
+    if (hostnameForSenders == null || hostnameForSenders.isEmpty()) {
+      if (bindAdd == null || bindAdd.isEmpty()) {
+        try {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST));
+          this.host = SocketCreator.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+          throw new IllegalStateException(
+              LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME
+                  .toLocalizedString(),
+              e);
+        }
+      } else {
+        this.host = bindAdd;
+      }
+    } else {
+      this.host = hostnameForSenders;
+    }
+
+    this.startPort = startPort;
+    this.endPort = endPort;
+    this.timeBetPings = timeBetPings;
+    this.socketBufferSize = buffSize;
+    this.bindAdd = bindAdd;
+    this.filters = filters;
+    this.manualStart = manualStart;
+  }
+
+  public List<GatewayTransportFilter> getGatewayTransportFilters() {
+    return this.filters;
+  }
+
+  public int getMaximumTimeBetweenPings() {
+    return this.timeBetPings;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public int getStartPort() {
+    return this.startPort;
+  }
+  
+  public int getEndPort() {
+    return this.endPort;
+  }
+  
+  public int getSocketBufferSize() {
+    return this.socketBufferSize;
+  }
+
+  public boolean isManualStart() {
+    return this.manualStart;
+  }
+  
+  public CacheServer getServer() {
+    return receiver;
+  }
+  
+  public void start() throws IOException {
+    if (receiver == null) {
+      receiver = this.cache.addCacheServer(true);
+    }
+    if (receiver.isRunning()) {
+      logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING));
+      return;
+    }
+    boolean started = false;
+    this.port = getPortToStart();
+    while (!started && this.port != -1) {
+      receiver.setPort(this.port);
+      receiver.setSocketBufferSize(socketBufferSize);
+      receiver.setMaximumTimeBetweenPings(timeBetPings);
+      receiver.setHostnameForClients(host);
+      receiver.setBindAddress(bindAdd);
+      receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP });
+      ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters);
+      try {
+        ((CacheServerImpl)receiver).start();
+        started = true;
+      } catch (BindException be) {
+        if (be.getCause() != null
+            && be.getCause().getMessage()
+                .contains("assign requested address")) {
+          throw new GatewayReceiverException(
+              LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
+                  .toLocalizedString(new Object[] { bindAdd,
+                      Integer.valueOf(this.port) }));
+        }
+        // ignore as this port might have been used by other threads.
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
+        this.port = getPortToStart();
+      } catch (SocketException se) {
+        if (se.getMessage().contains("Address already in use")) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
+          this.port = getPortToStart();
+
+        } else {
+          throw se;
+        }
+      }
+      
+    }
+    if (!started) {
+      throw new IllegalStateException(
+          "No available free port found in the given range.");
+    }
+    logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port));
+
+    InternalDistributedSystem system = this.cache.getDistributedSystem();
+    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
+
+  }
+  
+  private int getPortToStart(){
+    // choose a random port from the given port range
+    int rPort;
+    if (this.startPort == this.endPort) {
+      rPort = this.startPort;
+    } else {
+      rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort,
+          this.endPort, AvailablePort.SOCKET);
+    }
+    return rPort;
+  }
+  
+  public void stop() {
+    if(!isRunning()){
+      throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString());
+    }
+    receiver.stop();
+
+//    InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+//        .getDistributedSystem();
+//    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this);
+
+  }
+
+  public String getHost() {
+    return this.host;
+  }
+
+  public String getBindAddress() {
+    return this.bindAdd;
+  }
+
+  public boolean isRunning() {
+    if (this.receiver != null) {
+      return this.receiver.isRunning();
+    }
+    return false;
+  }
+  
+  public String toString() {
+    return new StringBuffer()
+      .append("Gateway Receiver")
+      .append("@").append(Integer.toHexString(hashCode()))
+      .append(" [")
+      .append("host='").append(getHost())
+      .append("'; port=").append(getPort())
+      .append("; bindAddress=").append(getBindAddress())
+      .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings())
+      .append("; socketBufferSize=").append(getSocketBufferSize())
+      .append("; isManualStart=").append(isManualStart())
+      .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP}))
+      .append("]")
+      .toString();
+  }
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
new file mode 100644
index 0000000..d2302c4
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.pdx.PdxRegistryMismatchException;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+import com.gemstone.gemfire.cache.client.internal.SenderProxy;
+
+/**
+ * @since GemFire 7.0
+ *
+ */
+public class GatewaySenderEventRemoteDispatcher implements
+    GatewaySenderEventDispatcher {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private final AbstractGatewaySenderEventProcessor processor;
+
+  private volatile Connection connection;
+
+  private final Set<String> notFoundRegions = new HashSet<String>();
+  
+  private final Object notFoundRegionsSync = new Object();
+  
+  private final AbstractGatewaySender sender;
+  
+  private AckReaderThread ackReaderThread;
+  
+  private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
+  
+  /**
+   * This count is reset to 0 each time a successful connection is made.
+   */
+  private int failedConnectCount = 0;
+  
+  public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
+    this.processor = eventProcessor;
+    this.sender = eventProcessor.getSender();
+//    this.ackReaderThread = new AckReaderThread(sender);
+    try {
+      initializeConnection();
+    }
+    catch (GatewaySenderException e) {
+      if (e.getCause() instanceof GemFireSecurityException) {
+        throw e;
+      }
+    }
+  }
+  
+  protected GatewayAck readAcknowledgement() {
+    SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
+    GatewayAck ack = null;
+    Exception ex;
+    try {
+      connection = getConnection(false);
+      if (logger.isDebugEnabled()) {
+        logger.debug(" Receiving ack on the thread {}", connection);
+      }
+      this.connectionLifeCycleLock.readLock().lock();
+      try {
+        if (connection != null) {
+          ack = (GatewayAck)sp.receiveAckFromReceiver(connection);
+        }
+      } finally {
+        this.connectionLifeCycleLock.readLock().unlock();
+      }
+
+    } catch (Exception e) {
+      Throwable t = e.getCause();
+      if (t instanceof BatchException70) {
+        // A BatchException has occurred.
+        // Do not process the connection as dead since it is not dead.
+        ex = (BatchException70)t;
+      } else if (e instanceof GatewaySenderException) { //This Exception is thrown from getConnection
+        ex = (Exception) e.getCause();
+      }else {
+        ex = e;
+        // keep using the connection if we had a batch exception. Else, destroy
+        // it
+        destroyConnection();
+      }
+      if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
+        // if our pool is shutdown then just be silent
+      } else if (ex instanceof IOException
+          || (ex instanceof ServerConnectivityException && !(ex.getCause() instanceof PdxRegistryMismatchException))
+          || ex instanceof ConnectionDestroyedException) {
+        // If the cause is an IOException or a ServerException, sleep and retry.
+        // Sleep for a bit and recheck.
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      } else {
+        if (!(ex instanceof CancelException)) {
+          logger.fatal(LocalizedMessage.create(
+                  LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
+                  ex);
+        }
+        this.processor.setIsStopped(true);
+      }
+    }
+    return ack;
+  }
+  
+  @Override
+  public boolean dispatchBatch(List events, boolean isRetry) {
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    boolean success = false;
+    try {
+      long start = statistics.startTime();
+      success =_dispatchBatch(events, isRetry);
+      if (success) {
+        statistics.endBatch(start, events.size());
+      }
+    } catch (GatewaySenderException ge) {
+
+      Throwable t = ge.getCause();
+      if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
+        // if our pool is shutdown then just be silent
+      } else if (t instanceof IOException
+          || t instanceof ServerConnectivityException
+          || t instanceof ConnectionDestroyedException
+          || t instanceof MessageTooLargeException
+          || t instanceof IllegalStateException) {
+        this.processor.handleException();
+        // If the cause is an IOException or a ServerException, sleep and retry.
+        // Sleep for a bit and recheck.
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Because of IOException, failed to dispatch a batch with id : {}", this.processor.getBatchId());
+        }
+      }
+      else {
+        logger.fatal(LocalizedMessage.create(
+            LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), ge);
+        this.processor.setIsStopped(true);
+      }
+    }
+    catch (CancelException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Stopping the processor because cancellation occurred while processing a batch");
+      }
+      this.processor.setIsStopped(true);
+      throw e;
+    } catch (Exception e) {
+      this.processor.setIsStopped(true);
+      logger.fatal(LocalizedMessage.create(
+              LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
+              e);
+    }
+    return success;
+  }
+
+  private boolean _dispatchBatch(List events, boolean isRetry) {
+    Exception ex = null;
+    int currentBatchId = this.processor.getBatchId();
+    connection = getConnection(true);
+    int batchIdForThisConnection = this.processor.getBatchId();
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    // This means we are writing to a new connection than the previous batch.
+    // i.e The connection has been reset. It also resets the batchId.
+    if (currentBatchId != batchIdForThisConnection
+        || this.processor.isConnectionReset()) {
+      return false;
+    }
+    try {
+      if (this.processor.isConnectionReset()) {
+        isRetry = true;
+      }
+      SenderProxy sp = new SenderProxy(this.sender.getProxy());
+      this.connectionLifeCycleLock.readLock().lock();
+      try {
+        if (connection != null) {
+          sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry);
+          if (logger.isDebugEnabled()) {
+            logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}",
+                this.processor.getSender(), currentBatchId,  events.size(), this.processor.getQueue().size(), connection);
+          }
+        } else {
+          throw new ConnectionDestroyedException();
+        }
+      }
+      finally{
+        this.connectionLifeCycleLock.readLock().unlock();
+      }
+      return true;
+    }
+    catch (ServerOperationException e) {
+      Throwable t = e.getCause();
+      if (t instanceof BatchException70) {
+        // A BatchException has occurred.
+        // Do not process the connection as dead since it is not dead.
+        ex = (BatchException70)t;
+      }
+      else {
+        ex = e;
+        // keep using the connection if we had a batch exception. Else, destroy it
+        destroyConnection();
+      }
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+    }
+    catch (GemFireIOException e) {
+      Throwable t = e.getCause();
+      if (t instanceof MessageTooLargeException) {
+        // A MessageTooLargeException has occurred.
+        // Do not process the connection as dead since it is not dead.
+        ex = (MessageTooLargeException)t;
+        // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less)
+        int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2;
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e);
+        this.processor.setBatchSize(newBatchSize);
+        statistics.incBatchesResized();
+      }
+      else {
+        ex = e;
+        // keep using the connection if we had a MessageTooLargeException. Else, destroy it
+        destroyConnection();
+      }
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+    }
+    catch (IllegalStateException e) {
+      this.processor.setException(new GatewaySenderException(e));
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), e);
+    }
+    catch (Exception e) {
+      // An Exception has occurred. Get its cause.
+      Throwable t = e.getCause();
+      if (t instanceof IOException) {
+        // An IOException has occurred.
+        ex = (IOException)t;
+      } else {
+        ex = e;
+      }
+      //the cause is not going to be BatchException70. So, destroy the connection
+      destroyConnection();
+      
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+    }
+  }
+  
+  /**
+   * Acquires or adds a new <code>Connection</code> to the corresponding
+   * <code>Gateway</code>
+   *
+   * @return the <code>Connection</code>
+   *
+   * @throws GatewaySenderException
+   */
+  public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
+    if (this.processor.isStopped()) {
+      return null;
+    }
+    // IF the connection is null 
+    // OR the connection's ServerLocation doesn't match with the one stored in sender
+    // THEN initialize the connection
+    if(!this.sender.isParallel()) {
+      if (this.connection == null || this.connection.isDestroyed()
+          || !this.connection.getServer().equals(this.sender.getServerLocation())) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}",
+              ((this.connection == null) ? "null" : this.connection.getServer()),
+              this.sender.getServerLocation());
+        }
+        // Initialize the connection
+        initializeConnection();
+      }
+    } else {
+      if (this.connection == null || this.connection.isDestroyed()) {
+        initializeConnection();
+      }
+    }
+    
+    // Here we might wait on a connection to another server if I was secondary
+    // so don't start waiting until I am primary
+    Cache cache = this.sender.getCache();
+    if (cache != null && !cache.isClosed()) {
+      if (this.sender.isPrimary() && (this.connection != null)) {
+        if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) {
+          this.ackReaderThread = new AckReaderThread(this.sender, this.processor);
+          this.ackReaderThread.start();
+          this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
+        }
+      }
+    }
+    return this.connection;
+  }
+  
+  public void destroyConnection() {
+    this.connectionLifeCycleLock.writeLock().lock();
+    try {
+      Connection con = this.connection;
+      if (con != null) {
+        if (!con.isDestroyed()) {
+          con.destroy();
+          this.sender.getProxy().returnConnection(con);
+        }
+        
+        // Reset the connection so the next time through a new one will be
+        // obtained
+        this.connection = null;
+        this.sender.setServerLocation(null);
+      }
+    }
+    finally {
+      this.connectionLifeCycleLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Initializes the <code>Connection</code>.
+   *
+   * @throws GatewaySenderException
+   */
+  private void initializeConnection() throws GatewaySenderException,
+      GemFireSecurityException {
+    this.connectionLifeCycleLock.writeLock().lock(); 
+    try {
+      // Attempt to acquire a connection
+      if (this.sender.getProxy() == null
+          || this.sender.getProxy().isDestroyed()) {
+        this.sender.initProxy();
+      } else {
+        this.processor.resetBatchId();
+      }
+      Connection con;
+      try {
+        if (this.sender.isParallel()) {
+          /*
+           * TODO - The use of acquireConnection should be removed
+           * from the gateway code. This method is fine for tests,
+           * but these connections should really be managed inside
+           * the pool code. If the gateway needs to persistent connection
+           * to a single server, which should create have the OpExecutor
+           * that holds a reference to the connection (similar to the way
+           * we do with thread local connections).
+           * Use {@link ExecutablePool#setupServerAffinity(boolean)} for
+           * gateway code
+           */
+          con = this.sender.getProxy().acquireConnection();
+          // For parallel sender, setting server location will not matter.
+          // everytime it will ask for acquire connection whenever it needs it. I
+          // am saving this server location for command purpose
+          sender.setServerLocation(con.getServer());  
+        } else {
+          synchronized (this.sender
+              .getLockForConcurrentDispatcher()) {
+            ServerLocation server = this.sender.getServerLocation();
+            if (server != null) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server);
+              }
+              con = this.sender.getProxy().acquireConnection(server);
+            } else {
+              if (logger.isDebugEnabled()) {
+                logger.debug("ServerLocation is null. Creating new connection. ");
+              }
+              con = this.sender.getProxy().acquireConnection();
+              // Acquired connection from pool!! Update the server location
+              // information in the sender and
+              // distribute the information to other senders ONLY IF THIS SENDER
+              // IS
+              // PRIMARY
+              if (this.sender.isPrimary()) {
+                if (sender.getServerLocation() == null) {
+                  sender.setServerLocation(con.getServer());
+                }
+                new UpdateAttributesProcessor(this.sender).distribute(false);
+              }
+            }
+          }
+        }
+      } catch (ServerConnectivityException e) {
+        this.failedConnectCount++;
+        Throwable ex = null;
+
+        if (e.getCause() instanceof GemFireSecurityException) {
+          ex = e.getCause();
+          if (logConnectionFailure()) {
+            // only log this message once; another msg is logged once we connect
+            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
+                new Object[] { this.processor.getSender().getId(), ex.getMessage() }));
+          }
+          throw new GatewaySenderException(ex);
+        }
+        List<ServerLocation> servers = this.sender.getProxy()
+            .getCurrentServers();
+        String ioMsg = null;
+        if (servers.size() == 0) {
+          ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
+              .toLocalizedString();
+        } else {
+          final StringBuilder buffer = new StringBuilder();
+          for (ServerLocation server : servers) {
+            String endpointName = String.valueOf(server);
+            if (buffer.length() > 0) {
+              buffer.append(", ");
+            }
+            buffer.append(endpointName);
+          }
+          ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
+              .toLocalizedString(buffer.toString());
+        }
+        ex = new IOException(ioMsg);
+        // Set the serverLocation to null so that a new connection can be
+        // obtained in next attempt
+        this.sender.setServerLocation(null);
+        if (this.failedConnectCount == 1) {
+          // only log this message once; another msg is logged once we connect
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT,
+                  this.processor.getSender().getId()));
+
+        }
+        // Wrap the IOException in a GatewayException so it can be processed the
+        // same as the other exceptions that might occur in sendBatch.
+        throw new GatewaySenderException(
+            LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
+                .toLocalizedString(this.processor.getSender().getId()), ex);
+      }
+      if (this.failedConnectCount > 0) {
+        Object[] logArgs = new Object[] { this.processor.getSender().getId(),
+            con, Integer.valueOf(this.failedConnectCount) };
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
+                logArgs));
+        this.failedConnectCount = 0;
+      } else {
+        Object[] logArgs = new Object[] { this.processor.getSender().getId(),
+            con };
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, logArgs));
+      }
+      this.connection = con;
+      this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
+    }
+    catch (ConnectionDestroyedException e) {
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor
+              .getSender().getId()), e);
+    }
+    finally {
+      this.connectionLifeCycleLock.writeLock().unlock();
+    }
+  }
+
+  protected boolean logConnectionFailure() {
+    // always log the first failure
+	if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
+	  return true;
+	}
+	else {
+	  // subsequent failures will be logged on 30th, 300th, 3000th try
+	  // each try is at 100millis from higher layer so this accounts for logging
+	  // after 3s, 30s and then every 5mins
+	  if (this.failedConnectCount >= 3000) {
+	    return (this.failedConnectCount % 3000) == 0;
+	  }
+	  else {
+	    return (this.failedConnectCount == 30 || this.failedConnectCount == 300);
+	  }
+    }
+  }
+  
+  public static class GatewayAck {
+    private int batchId;
+
+    private int numEvents;
+
+    private BatchException70 be;
+
+    public GatewayAck(BatchException70 be, int bId) {
+      this.be = be;
+      this.batchId = bId;
+    }
+
+    public GatewayAck(int batchId, int numEvents) {
+      this.batchId = batchId;
+      this.numEvents = numEvents;
+    }
+
+    /**
+     * @return the numEvents
+     */
+    public int getNumEvents() {
+      return numEvents;
+    }
+
+    /**
+     * @return the batchId
+     */
+    public int getBatchId() {
+      return batchId;
+    }
+
+    public BatchException70 getBatchException() {
+      return this.be;
+    }
+  }
+    
+  class AckReaderThread extends Thread {
+
+    private Object runningStateLock = new Object();
+
+    /**
+     * boolean to make a shutdown request
+     */
+    private volatile boolean shutdown = false;
+
+    private final GemFireCacheImpl cache;
+
+    private volatile boolean ackReaderThreadRunning = false;
+
+    public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
+      super("AckReaderThread for : " + processor.getName());
+      this.setDaemon(true);
+      this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache();
+    }
+
+    public void waitForRunningAckReaderThreadRunningState() {
+      synchronized (runningStateLock) {
+        while (!this.ackReaderThreadRunning) {
+          try {
+            this.runningStateLock.wait();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
+    }
+
+    private boolean checkCancelled() {
+      if (shutdown) {
+        return true;
+      }
+
+      if (cache.getCancelCriterion().isCancelInProgress()) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void run() {
+      if (logger.isDebugEnabled()) {
+        logger.debug("AckReaderThread started.. ");
+      }
+
+      synchronized (runningStateLock) {
+        ackReaderThreadRunning = true;
+        this.runningStateLock.notifyAll();
+      }
+
+      try {
+        for (;;) {
+          if (checkCancelled()) {
+            break;
+          }
+          GatewayAck ack = readAcknowledgement();
+          if (ack != null) {
+            boolean gotBatchException = ack.getBatchException() != null;
+            int batchId = ack.getBatchId();
+            int numEvents = ack.getNumEvents();
+
+            // If the batch is successfully processed, remove it from the
+            // queue.
+            if (gotBatchException) {
+              logger.info(LocalizedMessage.create(
+                  LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION,
+                      new Object[] { processor.getSender(), ack.getBatchId() }, ack.getBatchException()));
+              // If we get PDX related exception in the batch exception then try
+              // to resend all the pdx events as well in the next batch.
+              final GatewaySenderStats statistics = sender.getStatistics();
+              statistics.incBatchesRedistributed();
+              // log batch exceptions and remove all the events if remove from
+              // exception is true
+              // do not remove if it is false
+              logBatchExceptions(ack.getBatchException());
+              processor.handleSuccessBatchAck(batchId);
+
+            } // unsuccessful batch
+            else { // The batch was successful.
+              if (logger.isDebugEnabled()) {
+                logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events",
+                    processor.getSender(), ack.getBatchId(), ack.getNumEvents());
+              }
+              processor.handleSuccessBatchAck(batchId);
+            }
+          } else {
+            // If we have received IOException.
+            if (logger.isDebugEnabled()) {
+              logger.debug("{}: Received null ack from remote site.", processor.getSender());
+            }
+            processor.handleException();
+            try { // This wait is before trying to getting new connection to
+                  // receive ack. Without this there will be continuous call to
+                  // getConnection
+              Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          }
+        }
+      } catch (Exception e) {
+        if (!checkCancelled()) {
+          logger.fatal(LocalizedMessage.create(
+              LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e);
+        }
+        sender.getLifeCycleLock().writeLock().lock();
+        try {
+          processor.stopProcessing();
+          sender.clearTempEventsAfterSenderStopped();
+        } finally {
+          sender.getLifeCycleLock().writeLock().unlock();
+        }
+        // destroyConnection();
+      } finally {
+        if (logger.isDebugEnabled()) {
+          logger.debug("AckReaderThread exiting. ");
+        }
+        ackReaderThreadRunning = false;
+      }
+
+    }
+
+    /**
+     * @param exception 
+     * 
+     */
+    private void logBatchExceptions(BatchException70 exception) {
+      for (BatchException70 be : exception.getExceptions()) {
+        boolean logWarning = true;
+        if (be.getCause() instanceof RegionDestroyedException) {
+          RegionDestroyedException rde = (RegionDestroyedException)be
+              .getCause();
+          synchronized (notFoundRegionsSync) {
+            if (notFoundRegions.contains(rde.getRegionFullPath())) {
+              logWarning = false;
+            } else {
+              notFoundRegions.add(rde.getRegionFullPath());
+            }
+          }
+        } else if (be.getCause() instanceof IllegalStateException
+            && be.getCause().getMessage().contains("Unknown pdx type")) {
+          List<GatewaySenderEventImpl> pdxEvents = processor
+              .getBatchIdToPDXEventsMap().get(be.getBatchId());
+          if (logWarning) {
+            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
+                    be.getIndex()), be);
+          }
+          if (pdxEvents != null) {
+            for (GatewaySenderEventImpl senderEvent : pdxEvents) {
+              senderEvent.isAcked = false;
+            }
+            GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
+            if (logWarning) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
+            }
+          }
+          continue;
+        }
+        if (logWarning) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
+              be.getIndex()), be);
+        }
+        List<GatewaySenderEventImpl>[] eventsArr = processor.getBatchIdToEventsMap().get(be.getBatchId()); 
+        if (eventsArr != null) {
+          List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
+          GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents
+              .get(be.getIndex());
+          if (logWarning) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
+          }
+        }
+      }
+    }
+
+    boolean isRunning() {
+      return this.ackReaderThreadRunning;
+    }
+
+    public void shutdown() {
+      // we need to destroy connection irrespective of we are listening on it or
+      // not. No need to take lock as the reader thread may be blocked and we might not
+      // get chance to destroy unless that returns.
+      if (connection != null) {
+        Connection conn = connection;
+        shutDownAckReaderConnection();
+        if (!conn.isDestroyed()) {
+          conn.destroy();
+          sender.getProxy().returnConnection(conn);
+        }
+      }
+      this.shutdown = true;
+      boolean interrupted = Thread.interrupted();
+      try {
+        this.join(15 * 1000);
+      } catch (InterruptedException e) {
+        interrupted = true;
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      if (this.isAlive()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
+      }
+    }
+
+    private void shutDownAckReaderConnection() {
+      Connection conn = connection;
+      //attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck on a read
+      try {
+        if (conn != null && conn.getInputStream() != null) {
+          conn.getInputStream().close();
+        }
+      } catch (IOException e) {
+        logger.warn("Unable to shutdown AckReaderThread Connection");
+      } catch (ConnectionDestroyedException e) {
+        logger.info("AckReader shutting down and connection already destroyed");
+      }
+
+    }
+  }
+    
+  public void stopAckReaderThread() {
+    if (this.ackReaderThread != null) {
+      this.ackReaderThread.shutdown();
+    }
+  }
+  
+  @Override
+  public boolean isRemoteDispatcher() {
+    return true;
+  }
+
+  @Override
+  public boolean isConnectedToRemote() {
+      return connection != null;
+  }
+  
+  public void stop() {
+    stopAckReaderThread();
+    if(this.processor.isStopped()) {
+      destroyConnection();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
new file mode 100644
index 0000000..4974c6f
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * 
+ * @since GemFire 7.0
+ * 
+ */
+public class GatewaySenderFactoryImpl implements
+    InternalGatewaySenderFactory {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Used internally to pass the attributes from this factory to the real
+   * GatewaySender it is creating.
+   */
+  private GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+
+  private Cache cache;
+
+  private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED = new AtomicBoolean(false);
+
+  public GatewaySenderFactoryImpl(Cache cache) {
+    this.cache = cache;
+  }
+
+  public GatewaySenderFactory setParallel(boolean isParallel){
+    this.attrs.isParallel = isParallel;
+    return this;
+  }
+  
+  public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) {
+    this.attrs.isForInternalUse = isForInternalUse;
+    return this;
+  }
+  
+  public GatewaySenderFactory addGatewayEventFilter(
+      GatewayEventFilter filter) {
+    this.attrs.addGatewayEventFilter(filter);
+    return this;
+  }
+
+  public GatewaySenderFactory addGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.attrs.addGatewayTransportFilter(filter);
+    return this;
+  }
+
+  public GatewaySenderFactory addAsyncEventListener(
+      AsyncEventListener listener) {
+    this.attrs.addAsyncEventListener(listener);
+    return this;
+  }
+  
+  public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) {
+    this.attrs.socketBufferSize = socketBufferSize;
+    return this;
+  }
+
+  public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) {
+    this.attrs.socketReadTimeout = socketReadTimeout;
+    return this;
+  }
+
+  public GatewaySenderFactory setDiskStoreName(String diskStoreName) {
+    this.attrs.diskStoreName = diskStoreName;
+    return this;
+  }
+
+  public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) {
+    this.attrs.maximumQueueMemory = maximumQueueMemory;
+    return this;
+  }
+
+  public GatewaySenderFactory setBatchSize(int batchSize) {
+    this.attrs.batchSize = batchSize;
+    return this;
+  }
+
+  public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) {
+    this.attrs.batchTimeInterval = batchTimeInterval;
+    return this;
+  }
+
+  public GatewaySenderFactory setBatchConflationEnabled(
+      boolean enableBatchConflation) {
+    this.attrs.isBatchConflationEnabled = enableBatchConflation;
+    return this;
+  }
+
+  public GatewaySenderFactory setPersistenceEnabled(
+      boolean enablePersistence) {
+    this.attrs.isPersistenceEnabled = enablePersistence;
+    return this;
+  }
+
+  public GatewaySenderFactory setAlertThreshold(int threshold) {
+    this.attrs.alertThreshold = threshold;
+    return this;
+  }
+
+  public GatewaySenderFactory setManualStart(boolean start) {
+    this.attrs.manualStart = start;
+    return this;
+  }
+
+  public GatewaySenderFactory setLocatorDiscoveryCallback(
+      LocatorDiscoveryCallback locCallback) {
+    this.attrs.locatorDiscoveryCallback = locCallback;
+    return this;
+  }
+
+  @Override
+  public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) {
+    this.attrs.isDiskSynchronous = isSynchronous;
+    return this;
+  }
+  
+  @Override
+  public GatewaySenderFactory setDispatcherThreads(int numThreads) {
+    if ((numThreads > 1) && this.attrs.policy == null) {
+      this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+    }
+    this.attrs.dispatcherThreads = numThreads;
+    return this;
+  }
+
+  public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) {
+    this.attrs.parallelism = parallel;
+    this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+    return this;
+  }    
+    
+  @Override
+  public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) {
+    this.attrs.policy = policy;
+    return this;
+  }
+  
+  public GatewaySenderFactory setBucketSorted(boolean isBucketSorted){
+    this.attrs.isBucketSorted = isBucketSorted;
+    return this;
+  }
+  public GatewaySender create(String id, int remoteDSId) {
+    int myDSId = InternalDistributedSystem.getAnyInstance()
+        .getDistributionManager().getDistributedSystemId();
+    if (remoteDSId == myDSId) {
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_EQUAL_TO_THIS_SITE_ID
+              .toLocalizedString(id));
+    }
+    if (remoteDSId < 0) {
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_LESS_THAN_ZERO
+              .toLocalizedString(id));
+    }
+    this.attrs.id = id;
+    this.attrs.remoteDs = remoteDSId;
+    GatewaySender sender = null;
+
+    if(this.attrs.getDispatcherThreads() <= 0){
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewaySenderImpl_GATEWAY_SENDER_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
+              .toLocalizedString(id));
+    }
+
+    // Verify socket read timeout if a proper logger is available
+    if (this.cache instanceof GemFireCacheImpl) {
+      // If socket read timeout is less than the minimum, log a warning.
+      // Ideally, this should throw a GatewaySenderException, but wan dunit tests
+      // were failing, and we were running out of time to change them.
+      if (this.attrs.getSocketReadTimeout() != 0
+          && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_CONFIGURED_SOCKET_READ_TIMEOUT_TOO_LOW,
+            new Object[] { "GatewaySender " + id, this.attrs.getSocketReadTimeout(), GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT }));
+        this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT;
+      }
+
+      // Log a warning if the old system property is set.
+      if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
+        if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_OBSOLETE_SYSTEM_POPERTY,
+              new Object[] { GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, "GatewaySender socket read timeout" }));
+        }
+      }
+    }
+
+    if (this.attrs.isParallel()) {
+//      if(this.attrs.getDispatcherThreads() != 1){
+//        throw new GatewaySenderException(
+//            LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1
+//                .toLocalizedString(id));
+//      }
+      if ((this.attrs.getOrderPolicy() != null)
+          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
+        throw new GatewaySenderException(
+            LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
+                .toLocalizedString(id, this.attrs.getOrderPolicy()));
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    else {
+      if (this.attrs.getAsyncEventListeners().size() > 0) {
+        throw new GatewaySenderException(
+            LocalizedStrings.SerialGatewaySenderImpl_GATEWAY_0_CANNOT_DEFINE_A_REMOTE_SITE_BECAUSE_AT_LEAST_ONE_LISTENER_IS_ALREADY_ADDED
+                .toLocalizedString(id));
+      }
+//      if (this.attrs.getOrderPolicy() != null) {
+//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
+//          throw new GatewaySenderException(
+//              LocalizedStrings.SerialGatewaySender_INVALID_GATEWAY_SENDER_ORDER_POLICY_CONCURRENCY_0
+//                  .toLocalizedString(id));
+//        }
+//      }
+      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
+        this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    return sender;
+  }
+
+  public GatewaySender create(String id) {
+    this.attrs.id = id;
+    GatewaySender sender = null;
+
+    if(this.attrs.getDispatcherThreads() <= 0) {
+      throw new AsyncEventQueueConfigurationException(
+          LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
+              .toLocalizedString(id));
+    }
+    
+    if (this.attrs.isParallel()) {
+      if ((this.attrs.getOrderPolicy() != null)
+          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
+        throw new AsyncEventQueueConfigurationException(
+            LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
+                .toLocalizedString(id, this.attrs.getOrderPolicy()));
+      }
+      
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    else {
+//      if (this.attrs.getOrderPolicy() != null) {
+//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
+//          throw new AsyncEventQueueConfigurationException(
+//              LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0
+//                  .toLocalizedString(id));
+//        }
+//      }
+      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
+         this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    return sender;
+  }
+  
+  public GatewaySenderFactory removeGatewayEventFilter(
+      GatewayEventFilter filter) {
+    this.attrs.eventFilters.remove(filter);
+    return this;
+  }
+
+  public GatewaySenderFactory removeGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.attrs.transFilters.remove(filter);
+    return this;
+  } 
+  
+  public GatewaySenderFactory setGatewayEventSubstitutionFilter(
+      GatewayEventSubstitutionFilter filter) {
+    this.attrs.eventSubstitutionFilter = filter;
+    return this;
+  }
+
+  public void configureGatewaySender(GatewaySender senderCreation) {
+    this.attrs.isParallel = senderCreation.isParallel();
+    this.attrs.manualStart = senderCreation.isManualStart();
+    this.attrs.socketBufferSize = senderCreation.getSocketBufferSize();
+    this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout();
+    this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled();
+    this.attrs.batchSize = senderCreation.getBatchSize();
+    this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval();
+    this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled();
+    this.attrs.diskStoreName = senderCreation.getDiskStoreName();
+    this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous();
+    this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory();
+    this.attrs.alertThreshold = senderCreation.getAlertThreshold();
+    this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads();
+    this.attrs.policy = senderCreation.getOrderPolicy();
+    for(GatewayEventFilter filter : senderCreation.getGatewayEventFilters()){
+      this.attrs.eventFilters.add(filter);
+    }
+    for(GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()){
+      this.attrs.transFilters.add(filter);
+    }
+    this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
new file mode 100644
index 0000000..322b1ba
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.DistributedRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @since GemFire 7.0
+ *
+ */
+public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+      "Remote Site Discovery Logger Group", logger);
+  
+  public ParallelGatewaySenderImpl(){
+    super();
+    this.isParallel = true;
+  }
+  
+  public ParallelGatewaySenderImpl(Cache cache, GatewaySenderAttributes attrs) {
+    super(cache, attrs);
+  }
+  
+  @Override
+  public void start() {
+    this.getLifeCycleLock().writeLock().lock(); 
+    try {
+      if (isRunning()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+        return;
+      }
+
+      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+            .getConfig().getLocators();
+        if (locators.length() == 0) {
+          throw new IllegalStateException(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString());
+        }
+      }
+      /*
+       * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor"
+       * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a
+       * utility classes of Concurrent version of processor and queue.
+       */
+      eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this);
+      /*if (getDispatcherThreads() > 1) {
+        eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
+      } else {
+        eventProcessor = new ParallelGatewaySenderEventProcessor(this);
+      }*/
+      
+      eventProcessor.start();
+      waitForRunningStatus();
+      //Only notify the type registry if this is a WAN gateway queue
+      if(!isAsyncEventQueue()) {
+        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+      }
+      new UpdateAttributesProcessor(this).distribute(false);
+     
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+      
+      logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this));
+      
+      enqueueTempEvents();
+    }
+    finally {
+      this.getLifeCycleLock().writeLock().unlock();
+    }
+  }
+  
+//  /**
+//   * The sender is not started but only the message queue i.e. shadowPR is created on the node.
+//   * @param targetPr
+//   */
+//  private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) {
+//    eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr);
+//  }
+  
+
+  @Override
+  public void stop() {
+    this.getLifeCycleLock().writeLock().lock(); 
+    try {
+      if (!this.isRunning()) {
+        return;
+      }
+      // Stop the dispatcher
+      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+      //try {
+      if (ev != null && !ev.isStopped()) {
+        ev.stopProcessing();
+      }
+
+      // Stop the proxy (after the dispatcher, so the socket is still
+      // alive until after the dispatcher has stopped)
+      stompProxyDead();
+
+      // Close the listeners
+      for (AsyncEventListener listener : this.listeners) {
+        listener.close();
+      }
+      //stop the running threads, open sockets if any
+      ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp();
+
+      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+      
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+      .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+      
+      clearTempEventsAfterSenderStopped();
+      // Keep the eventProcessor around so we can ask it for the regionQueues later.
+      // Tests expect to be able to do this. 
+//      } finally {
+//        this.eventProcessor = null;
+//      }
+    }
+    finally {
+      this.getLifeCycleLock().writeLock().unlock();
+    }
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("ParallelGatewaySender{");
+    sb.append("id=" + getId());
+    sb.append(",remoteDsId="+ getRemoteDSId());
+    sb.append(",isRunning ="+ isRunning());
+    sb.append("}");
+    return sb.toString();
+  }
+
+  public void fillInProfile(Profile profile) {
+    assert profile instanceof GatewaySenderProfile;
+    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+    pf.Id = getId();
+    pf.remoteDSId = getRemoteDSId();
+    pf.isRunning = isRunning();
+    pf.isPrimary = isPrimary();
+    pf.isParallel = true;
+    pf.isBatchConflationEnabled = isBatchConflationEnabled();
+    pf.isPersistenceEnabled = isPersistenceEnabled();
+    pf.alertThreshold = getAlertThreshold();
+    pf.manualStart = isManualStart();
+    pf.dispatcherThreads = getDispatcherThreads();
+    pf.orderPolicy = getOrderPolicy();
+    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+      pf.eventFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+      pf.transFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (AsyncEventListener listener : getAsyncEventListeners()) {
+      pf.senderEventListenerClassNames.add(listener.getClass().getName());
+    }
+    pf.isDiskSynchronous = isDiskSynchronous();
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+   */
+  @Override
+  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+    int bucketId = -1;
+    //merged from 42004
+    if (clonedEvent.getRegion() instanceof DistributedRegion) {
+//      if (getOrderPolicy() == OrderPolicy.THREAD) {
+//        bucketId = PartitionedRegionHelper.getHashKey(
+//            ((EntryEventImpl)clonedEvent).getEventId().getThreadID(),
+//            getMaxParallelismForReplicatedRegion());
+//      }
+//      else
+        bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(),
+            getMaxParallelismForReplicatedRegion());
+    }
+    else {
+      bucketId = PartitionedRegionHelper
+          .getHashKey((EntryOperation)clonedEvent);
+    }
+    EventID originalEventId = clonedEvent.getEventId();
+    long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID());
+
+    long newThreadId = ThreadIdentifier
+    .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+        originatingThreadId, getEventIdIndex());
+    
+    // In case of parallel as all events go through primary buckets
+    // we don't need to generate different threadId for secondary buckets
+    // as they will be rejected if seen at PR level itself
+    
+//    boolean isPrimary = ((PartitionedRegion)getQueue().getRegion())
+//    .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+//    if (isPrimary) {
+//      newThreadId = ThreadIdentifier
+//          .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+//              originatingThreadId);
+//    } else {
+//      newThreadId = ThreadIdentifier
+//          .createFakeThreadIDForParallelGSSecondaryBucket(bucketId,
+//              originatingThreadId);
+//    }
+
+    EventID newEventId = new EventID(originalEventId.getMembershipID(),
+        newThreadId, originalEventId.getSequenceID(), bucketId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
+          this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId);
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..35cdece
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+/**
+ * Remote version of GatewaySenderEvent Processor
+ *
+ */
+public class RemoteConcurrentParallelGatewaySenderEventProcessor extends ConcurrentParallelGatewaySenderEventProcessor{
+  
+  public RemoteConcurrentParallelGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    super(sender);
+  }
+
+  @Override
+  protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
+    processors = new RemoteParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
+    if (logger.isDebugEnabled()) {
+      logger.debug("Creating GatewaySenderEventProcessor");
+    }
+    for (int i = 0; i < sender.getDispatcherThreads(); i++) {
+      processors[i] = new RemoteParallelGatewaySenderEventProcessor(sender,
+          targetRs, i, sender.getDispatcherThreads());
+    }
+  }
+  
+  @Override
+  protected void rebalance() {
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    long startTime = statistics.startLoadBalance();
+    try {
+      for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
+        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)parallelProcessor.getDispatcher();
+        if (remoteDispatcher.isConnectedToRemote()) {
+          remoteDispatcher.stopAckReaderThread();
+          remoteDispatcher.destroyConnection();
+        }
+      }
+    } finally {
+      statistics.endLoadBalance(startTime);
+    }
+  }
+}


Mime
View raw message