geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [24/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Wed, 20 Jan 2016 02:22:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
new file mode 100644
index 0000000..4a4446c
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -0,0 +1,758 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014, Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.ServerProxy;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.pdx.PdxRegistryMismatchException;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+import com.gemstone.gemfire.cache.client.internal.SenderProxy;
+
+/**
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @since 7.0
+ *
+ */
+public class GatewaySenderEventRemoteDispatcher implements
+    GatewaySenderEventDispatcher {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private final AbstractGatewaySenderEventProcessor processor;
+
+  private volatile Connection connection;
+
+  private final Set<String> notFoundRegions = new HashSet<String>();
+  
+  private final Object notFoundRegionsSync = new Object();
+  
+  private final AbstractGatewaySender sender;
+  
+  private AckReaderThread ackReaderThread;
+  
+  private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
+  
+  /**
+   * This count is reset to 0 each time a successful connection is made.
+   */
+  private int failedConnectCount = 0;
+  
+  public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
+    this.processor = eventProcessor;
+    this.sender = eventProcessor.getSender();
+//    this.ackReaderThread = new AckReaderThread(sender);
+    try {
+      initializeConnection();
+    }
+    catch (GatewaySenderException e) {
+      if (e.getCause() instanceof GemFireSecurityException) {
+        throw e;
+      }
+    }
+  }
+  
+  protected GatewayAck readAcknowledgement(int lastBatchIdRead) {
+    SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
+    GatewayAck ack = null;
+    Exception ex;
+    try {
+      connection = getConnection(false);
+      if (logger.isDebugEnabled()) {
+        logger.debug(" Receiving ack on the thread {}", connection);
+      }
+      this.connectionLifeCycleLock.readLock().lock();
+      try {
+        if (connection != null) {
+          ack = (GatewayAck)sp.receiveAckFromReceiver(connection);
+        }
+      } finally {
+        this.connectionLifeCycleLock.readLock().unlock();
+      }
+
+    } catch (Exception e) {
+      Throwable t = e.getCause();
+      if (t instanceof BatchException70) {
+        // A BatchException has occurred.
+        // Do not process the connection as dead since it is not dead.
+        ex = (BatchException70)t;
+      } else if (e instanceof GatewaySenderException) { //This Exception is thrown from getConnection
+        ex = (Exception) e.getCause();
+      }else {
+        ex = e;
+        // keep using the connection if we had a batch exception. Else, destroy
+        // it
+        destroyConnection();
+      }
+      if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
+        // if our pool is shutdown then just be silent
+      } else if (ex instanceof IOException
+          || (ex instanceof ServerConnectivityException && !(ex.getCause() instanceof PdxRegistryMismatchException))
+          || ex instanceof ConnectionDestroyedException) {
+        // If the cause is an IOException or a ServerException, sleep and retry.
+        // Sleep for a bit and recheck.
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      } else {
+        if (!(ex instanceof CancelException)) {
+          logger.fatal(LocalizedMessage.create(
+                  LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
+                  ex);
+        }
+        this.processor.setIsStopped(true);
+      }
+    }
+    return ack;
+  }
+  
+  @Override
+  public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry) {
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    boolean success = false;
+    try {
+      long start = statistics.startTime();
+      success =_dispatchBatch(events, isRetry);
+      statistics.endBatch(start, events.size());
+    } catch (GatewaySenderException ge) {
+
+      Throwable t = ge.getCause();
+      if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
+        // if our pool is shutdown then just be silent
+      } else if (t instanceof IOException
+          || t instanceof ServerConnectivityException
+          || t instanceof ConnectionDestroyedException) {
+        this.processor.handleException();
+        // If the cause is an IOException or a ServerException, sleep and retry.
+        // Sleep for a bit and recheck.
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Because of IOException, failed to dispatch a batch with id : {}", this.processor.getBatchId());
+        }
+      }
+      else {
+        logger.fatal(LocalizedMessage.create(
+            LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), ge);
+        this.processor.setIsStopped(true);
+      }
+    }
+    catch (CancelException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Stopping the processor because cancellation occurred while processing a batch");
+      }
+      this.processor.setIsStopped(true);
+      throw e;
+    } catch (Exception e) {
+      this.processor.setIsStopped(true);
+      logger.fatal(LocalizedMessage.create(
+              LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
+              e);
+    }
+    return success;
+  }
+
+  private boolean _dispatchBatch(List events, boolean isRetry) {
+    Exception ex = null;
+    int currentBatchId = this.processor.getBatchId();
+    connection = getConnection(true);
+    int batchIdForThisConnection = this.processor.getBatchId();
+    // This means we are writing to a new connection than the previous batch.
+    // i.e The connection has been reset. It also resets the batchId.
+    if (currentBatchId != batchIdForThisConnection
+        || this.processor.isConnectionReset()) {
+      return false;
+    }
+    try {
+      if (this.processor.isConnectionReset()) {
+        isRetry = true;
+      }
+      SenderProxy sp = new SenderProxy(this.sender.getProxy());
+      this.connectionLifeCycleLock.readLock().lock();
+      try {
+        if (connection != null) {
+          sp.dispatchBatch_NewWAN(connection, events, currentBatchId,
+              sender.isRemoveFromQueueOnException(), isRetry);
+          if (logger.isDebugEnabled()) {
+            logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}",
+                this.processor.getSender(), currentBatchId,  events.size(), this.processor.getQueue().size(), connection);
+          }
+        } else {
+          throw new ConnectionDestroyedException();
+        }
+      }
+      finally{
+        this.connectionLifeCycleLock.readLock().unlock();
+      }
+      return true;
+    }
+    catch (ServerOperationException e) {
+      Throwable t = e.getCause();
+      if (t instanceof BatchException70) {
+        // A BatchException has occurred.
+        // Do not process the connection as dead since it is not dead.
+        ex = (BatchException70)t;
+      }
+      else {
+        ex = e;
+        // keep using the connection if we had a batch exception. Else, destroy it
+        destroyConnection();
+      }
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+    }
+    catch (Exception e) {
+      // An Exception has occurred. Get its cause.
+      Throwable t = e.getCause();
+      if (t instanceof IOException) {
+        // An IOException has occurred.
+        ex = (IOException)t;
+      } else {
+        ex = e;
+      }
+      //the cause is not going to be BatchException70. So, destroy the connection
+      destroyConnection();
+      
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+    }
+  }
+  
+  /**
+   * Acquires or adds a new <code>Connection</code> to the corresponding
+   * <code>Gateway</code>
+   *
+   * @return the <code>Connection</code>
+   *
+   * @throws GatewaySenderException
+   * @throws InterruptedException 
+   */
+  public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
+    // IF the connection is null 
+    // OR the connection's ServerLocation doesn't match with the one stored in sender
+    // THEN initialize the connection
+    if(!this.sender.isParallel()) {
+      if (this.connection == null || this.connection.isDestroyed()
+          || !this.connection.getServer().equals(this.sender.getServerLocation())) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}",
+              ((this.connection == null) ? "null" : this.connection.getServer()),
+              this.sender.getServerLocation());
+        }
+        // Initialize the connection
+        initializeConnection();
+      }
+    } else {
+      if (this.connection == null || this.connection.isDestroyed()) {
+        initializeConnection();
+      }
+    }
+    
+    // Here we might wait on a connection to another server if I was secondary
+    // so don't start waiting until I am primary
+    Cache cache = this.sender.getCache();
+    if (cache != null && !cache.isClosed()) {
+      if (this.sender.isPrimary() && (this.connection != null)) {
+        if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) {
+          this.ackReaderThread = new AckReaderThread(this.sender);
+          this.ackReaderThread.start();
+          this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
+        }
+      }
+    }
+    return this.connection;
+  }
+  
+  public void destroyConnection() {
+    this.connectionLifeCycleLock.writeLock().lock();
+    try {
+      Connection con = this.connection;
+      if (con != null) {
+        if (!con.isDestroyed()) {
+          con.destroy();
+         this.sender.getProxy().returnConnection(con);
+        }
+        
+        // Reset the connection so the next time through a new one will be
+        // obtained
+        this.connection = null;
+        this.sender.setServerLocation(null);
+      }
+    }
+    finally {
+      this.connectionLifeCycleLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Initializes the <code>Connection</code>.
+   *
+   * @throws GatewaySenderException
+   */
+  private void initializeConnection() throws GatewaySenderException,
+      GemFireSecurityException {
+    this.connectionLifeCycleLock.writeLock().lock(); 
+    try {
+      // Attempt to acquire a connection
+      if (this.sender.getProxy() == null
+          || this.sender.getProxy().isDestroyed()) {
+        this.sender.initProxy();
+      } else {
+        this.processor.resetBatchId();
+      }
+      Connection con;
+      try {
+        if (this.sender.isParallel()) {
+          con = this.sender.getProxy().acquireConnection();
+          // For parallel sender, setting server location will not matter.
+          // everytime it will ask for acquire connection whenever it needs it. I
+          // am saving this server location for command purpose
+          sender.setServerLocation(con.getServer());  
+        } else {
+          synchronized (this.sender
+              .getLockForConcurrentDispatcher()) {
+            if (this.sender.getServerLocation() != null) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", sender.getServerLocation());
+              }
+              con = this.sender.getProxy().acquireConnection(
+                  this.sender.getServerLocation());
+            } else {
+              if (logger.isDebugEnabled()) {
+                logger.debug("ServerLocation is null. Creating new connection. ");
+              }
+              con = this.sender.getProxy().acquireConnection();
+              // Acquired connection from pool!! Update the server location
+              // information in the sender and
+              // distribute the information to other senders ONLY IF THIS SENDER
+              // IS
+              // PRIMARY
+              if (this.sender.isPrimary()) {
+                if (sender.getServerLocation() == null) {
+                  sender.setServerLocation(con.getServer());
+                }
+                new UpdateAttributesProcessor(this.sender).distribute(false);
+              }
+            }
+          }
+        }
+      } catch (ServerConnectivityException e) {
+        this.failedConnectCount++;
+        Throwable ex = null;
+
+        if (e.getCause() instanceof GemFireSecurityException) {
+          ex = e.getCause();
+          if (logConnectionFailure()) {
+            // only log this message once; another msg is logged once we connect
+            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
+                new Object[] { this.processor.getSender().getId(), ex.getMessage() }));
+          }
+          throw new GatewaySenderException(ex);
+        }
+        List<ServerLocation> servers = this.sender.getProxy()
+            .getCurrentServers();
+        String ioMsg = null;
+        if (servers.size() == 0) {
+          ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
+              .toLocalizedString();
+        } else {
+          final StringBuilder buffer = new StringBuilder();
+          for (ServerLocation server : servers) {
+            String endpointName = String.valueOf(server);
+            if (buffer.length() > 0) {
+              buffer.append(", ");
+            }
+            buffer.append(endpointName);
+          }
+          ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
+              .toLocalizedString(buffer.toString());
+        }
+        ex = new IOException(ioMsg);
+        // Set the serverLocation to null so that a new connection can be
+        // obtained in next attempt
+        this.sender.setServerLocation(null);
+        if (this.failedConnectCount == 1) {
+          // only log this message once; another msg is logged once we connect
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT,
+                  this.processor.getSender().getId()));
+
+        }
+        // Wrap the IOException in a GatewayException so it can be processed the
+        // same as the other exceptions that might occur in sendBatch.
+        throw new GatewaySenderException(
+            LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
+                .toLocalizedString(this.processor.getSender().getId()), ex);
+      }
+      if (this.failedConnectCount > 0) {
+        Object[] logArgs = new Object[] { this.processor.getSender().getId(),
+            con, Integer.valueOf(this.failedConnectCount) };
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
+                logArgs));
+        this.failedConnectCount = 0;
+      } else {
+        Object[] logArgs = new Object[] { this.processor.getSender().getId(),
+            con };
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, logArgs));
+      }
+      this.connection = con;
+      this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
+    }
+    catch (ConnectionDestroyedException e) {
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor
+              .getSender().getId()), e);
+    }
+    finally {
+      this.connectionLifeCycleLock.writeLock().unlock();
+    }
+  }
+
+  protected boolean logConnectionFailure() {
+    // always log the first failure
+	if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
+	  return true;
+	}
+	else {
+	  // subsequent failures will be logged on 30th, 300th, 3000th try
+	  // each try is at 100millis from higher layer so this accounts for logging
+	  // after 3s, 30s and then every 5mins
+	  if (this.failedConnectCount >= 3000) {
+	    return (this.failedConnectCount % 3000) == 0;
+	  }
+	  else {
+	    return (this.failedConnectCount == 30 || this.failedConnectCount == 300);
+	  }
+    }
+  }
+  
+  public static class GatewayAck {
+    private int batchId;
+
+    private int numEvents;
+
+    private BatchException70 be;
+
+    public GatewayAck(BatchException70 be, int bId) {
+      this.be = be;
+      this.batchId = bId;
+    }
+
+    public GatewayAck(int batchId, int numEvents) {
+      this.batchId = batchId;
+      this.numEvents = numEvents;
+    }
+
+    /**
+     * @return the numEvents
+     */
+    public int getNumEvents() {
+      return numEvents;
+    }
+
+    /**
+     * @return the batchId
+     */
+    public int getBatchId() {
+      return batchId;
+    }
+
+    public BatchException70 getBatchException() {
+      return this.be;
+    }
+  }
+    
+  class AckReaderThread extends Thread {
+
+    private Object runningStateLock = new Object();
+
+    /**
+     * boolean to make a shutdown request
+     */
+    private volatile boolean shutdown = false;
+
+    private final GemFireCacheImpl cache;
+
+    private volatile boolean ackReaderThreadRunning = false;
+
+    public AckReaderThread(GatewaySender sender) {
+      super("AckReaderThread for : " + sender.getId());
+      this.setDaemon(true);
+      this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache();
+    }
+
+    public void waitForRunningAckReaderThreadRunningState() {
+      synchronized (runningStateLock) {
+        while (!this.ackReaderThreadRunning) {
+          try {
+            this.runningStateLock.wait();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
+    }
+
+    private boolean checkCancelled() {
+      if (shutdown) {
+        return true;
+      }
+
+      if (cache.getCancelCriterion().cancelInProgress() != null) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void run() {
+      int lastBatchIdRead = -1;
+      if (logger.isDebugEnabled()) {
+        logger.debug("AckReaderThread started.. ");
+      }
+
+      synchronized (runningStateLock) {
+        ackReaderThreadRunning = true;
+        this.runningStateLock.notifyAll();
+      }
+
+      try {
+        for (;;) {
+          if (checkCancelled()) {
+            break;
+          }
+          GatewayAck ack = readAcknowledgement(lastBatchIdRead);
+          if (ack != null) {
+            boolean gotBatchException = ack.getBatchException() != null;
+            int batchId = ack.getBatchId();
+            lastBatchIdRead = batchId;
+            int numEvents = ack.getNumEvents();
+
+            // If the batch is successfully processed, remove it from the
+            // queue.
+            if (gotBatchException) {
+              logger.info(LocalizedMessage.create(
+                  LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION,
+                      new Object[] { processor.getSender(), ack.getBatchId() }, ack.getBatchException()));
+              // If we get PDX related exception in the batch exception then try
+              // to resend all the pdx events as well in the next batch.
+              final GatewaySenderStats statistics = sender.getStatistics();
+              statistics.incBatchesRedistributed();
+              // log batch exceptions and remove all the events if remove from
+              // exception is true
+              // do not remove if it is false
+              if (sender.isRemoveFromQueueOnException()) {
+                // log the batchExceptions
+                logBatchExceptions(ack.getBatchException());
+                  processor.handleSuccessBatchAck(batchId);
+              } else {
+                // we assume that batch exception will not occur for PDX related
+                // events
+                List<GatewaySenderEventImpl> pdxEvents = processor
+                    .getBatchIdToPDXEventsMap().get(
+                        ack.getBatchException().getBatchId());
+                if (pdxEvents != null) {
+                  for (GatewaySenderEventImpl senderEvent : pdxEvents) {
+                    senderEvent.isAcked = true;
+                  }
+                }
+                // log the batchExceptions
+                logBatchExceptions(ack.getBatchException());
+                // remove the events that have been processed.
+                BatchException70 be = ack.getBatchException();
+                List<BatchException70> exceptions = be.getExceptions();
+
+                for (int i = 0; i < exceptions.get(0).getIndex(); i++) {
+                  processor.eventQueueRemove();
+                }
+                // reset the sender
+                processor.handleException();
+              }
+            } // unsuccessful batch
+            else { // The batch was successful.
+              if (logger.isDebugEnabled()) {
+                logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events",
+                    processor.getSender(), ack.getBatchId(), ack.getNumEvents());
+              }
+              processor.handleSuccessBatchAck(batchId);
+            }
+          } else {
+            // If we have received IOException.
+            if (logger.isDebugEnabled()) {
+              logger.debug("{}: Received null ack from remote site.", processor.getSender());
+            }
+            processor.handleException();
+            try { // This wait is before trying to getting new connection to
+                  // receive ack. Without this there will be continuous call to
+                  // getConnection
+              Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          }
+        }
+      } catch (Exception e) {
+        if (!checkCancelled()) {
+          logger.fatal(LocalizedMessage.create(
+              LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e);
+        }
+        sender.lifeCycleLock.writeLock().lock();
+        try {
+          processor.stopProcessing();
+          sender.clearTempEventsAfterSenderStopped();
+        } finally {
+          sender.lifeCycleLock.writeLock().unlock();
+        }
+        // destroyConnection();
+      } finally {
+        if (logger.isDebugEnabled()) {
+          logger.debug("AckReaderThread exiting. ");
+        }
+        ackReaderThreadRunning = false;
+      }
+
+    }
+
+    /**
+     * @param exception 
+     * 
+     */
+    private void logBatchExceptions(BatchException70 exception) {
+      for (BatchException70 be : exception.getExceptions()) {
+        boolean logWarning = true;
+        if (be.getCause() instanceof RegionDestroyedException) {
+          RegionDestroyedException rde = (RegionDestroyedException)be
+              .getCause();
+          synchronized (notFoundRegionsSync) {
+            if (notFoundRegions.contains(rde.getRegionFullPath())) {
+              logWarning = false;
+            } else {
+              notFoundRegions.add(rde.getRegionFullPath());
+            }
+          }
+        } else if (be.getCause() instanceof IllegalStateException
+            && be.getCause().getMessage().contains("Unknown pdx type")) {
+          List<GatewaySenderEventImpl> pdxEvents = processor
+              .getBatchIdToPDXEventsMap().get(be.getBatchId());
+          if (logWarning) {
+            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
+                    be.getIndex()), be);
+          }
+          if (pdxEvents != null) {
+            for (GatewaySenderEventImpl senderEvent : pdxEvents) {
+              senderEvent.isAcked = false;
+            }
+            GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
+            if (logWarning) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
+            }
+          }
+          continue;
+        }
+        if (logWarning) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
+              be.getIndex()), be);
+        }
+        List<GatewaySenderEventImpl>[] eventsArr = processor.getBatchIdToEventsMap().get(be.getBatchId()); 
+        if (eventsArr != null) {
+          List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
+          GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents
+              .get(be.getIndex());
+          if (logWarning) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
+          }
+        }
+      }
+    }
+
+    boolean isRunning() {
+      return this.ackReaderThreadRunning;
+    }
+
+    public void shutdown() {
+      // we need to destroy connection irrespective of we are listening on it or
+      // not. No need to take lock as the reader thread may be blocked and we might not
+      // get chance to destroy unless that returns.
+      if (connection != null) {
+        if (!connection.isDestroyed()) {
+          connection.destroy();
+          sender.getProxy().returnConnection(connection);
+        }
+      }
+      this.shutdown = true;
+      boolean interrupted = Thread.interrupted();
+      try {
+        this.join(15 * 1000);
+      } catch (InterruptedException e) {
+        interrupted = true;
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      if (this.isAlive()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
+      }
+    }
+  }
+    
+  public void stopAckReaderThread() {
+    if (this.ackReaderThread != null) {
+      this.ackReaderThread.shutdown();
+    }    
+  }
+  
+  @Override
+  public boolean isRemoteDispatcher() {
+    return true;
+  }
+
+  @Override
+  public boolean isConnectedToRemote() {
+      return connection != null;
+  }
+  
+  public void stop() {
+    stopAckReaderThread();
+    if(this.processor.isStopped()) {
+      destroyConnection();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
new file mode 100644
index 0000000..33128c8
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -0,0 +1,381 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * ========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @author Kishor Bachhav
+ * 
+ * @since 7.0
+ * 
+ */
+public class GatewaySenderFactoryImpl implements
+    InternalGatewaySenderFactory {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Used internally to pass the attributes from this factory to the real
+   * GatewaySender it is creating.
+   */
+  private GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+
+  private Cache cache;
+
+  private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED = new AtomicBoolean(false);
+
+  public GatewaySenderFactoryImpl(Cache cache) {
+    this.cache = cache;
+  }
+
+  public GatewaySenderFactory setParallel(boolean isParallel){
+    this.attrs.isParallel = isParallel;
+    return this;
+  }
+  
+  public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) {
+    this.attrs.isForInternalUse = isForInternalUse;
+    return this;
+  }
+  
+  public GatewaySenderFactory addGatewayEventFilter(
+      GatewayEventFilter filter) {
+    this.attrs.addGatewayEventFilter(filter);
+    return this;
+  }
+
+  public GatewaySenderFactory addGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.attrs.addGatewayTransportFilter(filter);
+    return this;
+  }
+
+  public GatewaySenderFactory addAsyncEventListener(
+      AsyncEventListener listener) {
+    this.attrs.addAsyncEventListener(listener);
+    return this;
+  }
+  
+  public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) {
+    this.attrs.socketBufferSize = socketBufferSize;
+    return this;
+  }
+
+  public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) {
+    this.attrs.socketReadTimeout = socketReadTimeout;
+    return this;
+  }
+
+  public GatewaySenderFactory setDiskStoreName(String diskStoreName) {
+    this.attrs.diskStoreName = diskStoreName;
+    return this;
+  }
+
+  public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) {
+    this.attrs.maximumQueueMemory = maximumQueueMemory;
+    return this;
+  }
+
+  public GatewaySenderFactory setBatchSize(int batchSize) {
+    this.attrs.batchSize = batchSize;
+    return this;
+  }
+
+  public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) {
+    this.attrs.batchTimeInterval = batchTimeInterval;
+    return this;
+  }
+
+  public GatewaySenderFactory setBatchConflationEnabled(
+      boolean enableBatchConflation) {
+    this.attrs.isBatchConflationEnabled = enableBatchConflation;
+    return this;
+  }
+
+  public GatewaySenderFactory setPersistenceEnabled(
+      boolean enablePersistence) {
+    this.attrs.isPersistenceEnabled = enablePersistence;
+    return this;
+  }
+
+  public GatewaySenderFactory setAlertThreshold(int threshold) {
+    this.attrs.alertThreshold = threshold;
+    return this;
+  }
+
+  public GatewaySenderFactory setManualStart(boolean start) {
+    this.attrs.manualStart = start;
+    return this;
+  }
+
+  public GatewaySenderFactory setLocatorDiscoveryCallback(
+      LocatorDiscoveryCallback locCallback) {
+    this.attrs.locatorDiscoveryCallback = locCallback;
+    return this;
+  }
+
+  @Override
+  public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) {
+    this.attrs.isDiskSynchronous = isSynchronous;
+    return this;
+  }
+  
+  @Override
+  public GatewaySenderFactory setDispatcherThreads(int numThreads) {
+    if ((numThreads > 1) && this.attrs.policy == null) {
+      this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+    }
+    this.attrs.dispatcherThreads = numThreads;
+    return this;
+  }
+
+  public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) {
+    this.attrs.parallelism = parallel;
+    this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+    return this;
+  }    
+    
+  @Override
+  public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) {
+    this.attrs.policy = policy;
+    return this;
+  }
+  
+  public GatewaySenderFactory setBucketSorted(boolean isBucketSorted){
+    this.attrs.isBucketSorted = isBucketSorted;
+    return this;
+  }
+  public GatewaySenderFactory setIsHDFSQueue(boolean isHDFSQueue){
+    this.attrs.isHDFSQueue = isHDFSQueue;
+    return this;
+  }
+  public GatewaySender create(String id, int remoteDSId) {
+    int myDSId = InternalDistributedSystem.getAnyInstance()
+        .getDistributionManager().getDistributedSystemId();
+    if (remoteDSId == myDSId) {
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_EQUAL_TO_THIS_SITE_ID
+              .toLocalizedString(id));
+    }
+    if (remoteDSId < 0) {
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_LESS_THAN_ZERO
+              .toLocalizedString(id));
+    }
+    this.attrs.id = id;
+    this.attrs.remoteDs = remoteDSId;
+    GatewaySender sender = null;
+
+    if(this.attrs.getDispatcherThreads() <= 0){
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewaySenderImpl_GATEWAY_SENDER_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
+              .toLocalizedString(id));
+    }
+
+    // Verify socket read timeout if a proper logger is available
+    if (this.cache instanceof GemFireCacheImpl) {
+      // If socket read timeout is less than the minimum, log a warning.
+      // Ideally, this should throw a GatewaySenderException, but wan dunit tests
+      // were failing, and we were running out of time to change them.
+      if (this.attrs.getSocketReadTimeout() != 0
+          && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_CONFIGURED_SOCKET_READ_TIMEOUT_TOO_LOW,
+            new Object[] { "GatewaySender " + id, this.attrs.getSocketReadTimeout(), GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT }));
+        this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT;
+      }
+
+      // Log a warning if the old system property is set.
+      if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
+        if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_OBSOLETE_SYSTEM_POPERTY,
+              new Object[] { GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, "GatewaySender socket read timeout" }));
+        }
+      }
+    }
+
+    if (this.attrs.isParallel()) {
+//      if(this.attrs.getDispatcherThreads() != 1){
+//        throw new GatewaySenderException(
+//            LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATHER_THREADS_OTHER_THAN_1
+//                .toLocalizedString(id));
+//      }
+      if ((this.attrs.getOrderPolicy() != null)
+          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
+        throw new GatewaySenderException(
+            LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
+                .toLocalizedString(id, this.attrs.getOrderPolicy()));
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    else {
+      if (this.attrs.getAsyncEventListeners().size() > 0) {
+        throw new GatewaySenderException(
+            LocalizedStrings.SerialGatewaySenderImpl_GATEWAY_0_CANNOT_DEFINE_A_REMOTE_SITE_BECAUSE_AT_LEAST_ONE_LISTENER_IS_ALREADY_ADDED
+                .toLocalizedString(id));
+      }
+//      if (this.attrs.getOrderPolicy() != null) {
+//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
+//          throw new GatewaySenderException(
+//              LocalizedStrings.SerialGatewaySender_INVALID_GATEWAY_SENDER_ORDER_POLICY_CONCURRENCY_0
+//                  .toLocalizedString(id));
+//        }
+//      }
+      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
+        this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    return sender;
+  }
+
+  public GatewaySender create(String id) {
+    this.attrs.id = id;
+    GatewaySender sender = null;
+
+    if(this.attrs.getDispatcherThreads() <= 0) {
+      throw new AsyncEventQueueConfigurationException(
+          LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
+              .toLocalizedString(id));
+    }
+    
+    if (this.attrs.isParallel()) {
+      if ((this.attrs.getOrderPolicy() != null)
+          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
+        throw new AsyncEventQueueConfigurationException(
+            LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
+                .toLocalizedString(id, this.attrs.getOrderPolicy()));
+      }
+      
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    else {
+//      if (this.attrs.getOrderPolicy() != null) {
+//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
+//          throw new AsyncEventQueueConfigurationException(
+//              LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0
+//                  .toLocalizedString(id));
+//        }
+//      }
+      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
+         this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    return sender;
+  }
+  
+  public GatewaySenderFactory removeGatewayEventFilter(
+      GatewayEventFilter filter) {
+    this.attrs.eventFilters.remove(filter);
+    return this;
+  }
+
+  public GatewaySenderFactory removeGatewayTransportFilter(
+      GatewayTransportFilter filter) {
+    this.attrs.transFilters.remove(filter);
+    return this;
+  } 
+  
+  public GatewaySenderFactory setGatewayEventSubstitutionFilter(
+      GatewayEventSubstitutionFilter filter) {
+    this.attrs.eventSubstitutionFilter = filter;
+    return this;
+  }
+
+  public void configureGatewaySender(GatewaySender senderCreation) {
+    this.attrs.isParallel = senderCreation.isParallel();
+    this.attrs.manualStart = senderCreation.isManualStart();
+    this.attrs.socketBufferSize = senderCreation.getSocketBufferSize();
+    this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout();
+    this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled();
+    this.attrs.batchSize = senderCreation.getBatchSize();
+    this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval();
+    this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled();
+    this.attrs.diskStoreName = senderCreation.getDiskStoreName();
+    this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous();
+    this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory();
+    this.attrs.alertThreshold = senderCreation.getAlertThreshold();
+    this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads();
+    this.attrs.policy = senderCreation.getOrderPolicy();
+    for(GatewayEventFilter filter : senderCreation.getGatewayEventFilters()){
+      this.attrs.eventFilters.add(filter);
+    }
+    for(GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()){
+      this.attrs.transFilters.add(filter);
+    }
+    this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
new file mode 100644
index 0000000..74501fb
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -0,0 +1,261 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * ========================================================================
+ * 
+ */
+
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.DistributedRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @since 7.0
+ *
+ */
+public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+      "Remote Site Discovery Logger Group", logger);
+  
+  public ParallelGatewaySenderImpl(){
+    super();
+    this.isParallel = true;
+  }
+  
+  public ParallelGatewaySenderImpl(Cache cache, GatewaySenderAttributes attrs) {
+    super(cache, attrs);
+  }
+  
+  @Override
+  public void start() {
+    this.lifeCycleLock.writeLock().lock(); 
+    try {
+      if (isRunning()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+        return;
+      }
+
+      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+            .getConfig().getLocators();
+        if (locators.length() == 0) {
+          throw new IllegalStateException(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString());
+        }
+      }
+      /*
+       * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor"
+       * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a
+       * utility classes of Concurrent version of processor and queue.
+       */
+      eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this);
+      /*if (getDispatcherThreads() > 1) {
+        eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
+      } else {
+        eventProcessor = new ParallelGatewaySenderEventProcessor(this);
+      }*/
+      
+      eventProcessor.start();
+      waitForRunningStatus();
+      //Only notify the type registry if this is a WAN gateway queue
+      if(!isAsyncEventQueue()) {
+        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+      }
+      new UpdateAttributesProcessor(this).distribute(false);
+     
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+      
+      logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this));
+      
+      if (!tmpQueuedEvents.isEmpty()) {
+        enqueueTempEvents();
+      }
+    }
+    finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+  }
+  
+//  /**
+//   * The sender is not started but only the message queue i.e. shadowPR is created on the node.
+//   * @param targetPr
+//   */
+//  private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) {
+//    eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr);
+//  }
+  
+
+  @Override
+  public void stop() {
+    this.lifeCycleLock.writeLock().lock(); 
+    try {
+      if (!this.isRunning()) {
+        return;
+      }
+      // Stop the dispatcher
+      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+      //try {
+      if (ev != null && !ev.isStopped()) {
+        ev.stopProcessing();
+      }
+
+      // Stop the proxy (after the dispatcher, so the socket is still
+      // alive until after the dispatcher has stopped)
+      stompProxyDead();
+
+      // Close the listeners
+      for (AsyncEventListener listener : this.listeners) {
+        listener.close();
+      }
+      //stop the running threads, open sockets if any
+      ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp();
+
+      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+      
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+      .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+      
+      clearTempEventsAfterSenderStopped();
+      // Keep the eventProcessor around so we can ask it for the regionQueues later.
+      // Tests expect to be able to do this. 
+//      } finally {
+//        this.eventProcessor = null;
+//      }
+    }
+    finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("ParallelGatewaySender{");
+    sb.append("id=" + getId());
+    sb.append(",remoteDsId="+ getRemoteDSId());
+    sb.append(",isRunning ="+ isRunning());
+    sb.append("}");
+    return sb.toString();
+  }
+
+  public void fillInProfile(Profile profile) {
+    assert profile instanceof GatewaySenderProfile;
+    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+    pf.Id = getId();
+    pf.remoteDSId = getRemoteDSId();
+    pf.isRunning = isRunning();
+    pf.isPrimary = isPrimary();
+    pf.isParallel = true;
+    pf.isBatchConflationEnabled = isBatchConflationEnabled();
+    pf.isPersistenceEnabled = isPersistenceEnabled();
+    pf.alertThreshold = getAlertThreshold();
+    pf.manualStart = isManualStart();
+    pf.dispatcherThreads = getDispatcherThreads();
+    pf.orderPolicy = getOrderPolicy();
+    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+      pf.eventFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+      pf.transFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (AsyncEventListener listener : getAsyncEventListeners()) {
+      pf.senderEventListenerClassNames.add(listener.getClass().getName());
+    }
+    pf.isDiskSynchronous = isDiskSynchronous();
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+   */
+  @Override
+  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+    int bucketId = -1;
+    //merged from 42004
+    if (clonedEvent.getRegion() instanceof DistributedRegion) {
+//      if (getOrderPolicy() == OrderPolicy.THREAD) {
+//        bucketId = PartitionedRegionHelper.getHashKey(
+//            ((EntryEventImpl)clonedEvent).getEventId().getThreadID(),
+//            getMaxParallelismForReplicatedRegion());
+//      }
+//      else
+        bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(),
+            getMaxParallelismForReplicatedRegion());
+    }
+    else {
+      bucketId = PartitionedRegionHelper
+          .getHashKey((EntryOperation)clonedEvent);
+    }
+    EventID originalEventId = clonedEvent.getEventId();
+    long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID());
+
+    long newThreadId = ThreadIdentifier
+    .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+        originatingThreadId, getEventIdIndex());
+    
+    // In case of parallel as all events go through primary buckets
+    // we don't neet to generate different threadId for secondary buckets
+    // as they will be rejected if seen at PR level itself
+    
+//    boolean isPrimary = ((PartitionedRegion)getQueue().getRegion())
+//    .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+//    if (isPrimary) {
+//      newThreadId = ThreadIdentifier
+//          .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+//              originatingThreadId);
+//    } else {
+//      newThreadId = ThreadIdentifier
+//          .createFakeThreadIDForParallelGSSecondaryBucket(bucketId,
+//              originatingThreadId);
+//    }
+
+    EventID newEventId = new EventID(originalEventId.getMembershipID(),
+        newThreadId, originalEventId.getSequenceID(), bucketId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
+          this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId);
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..db7c03c
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
@@ -0,0 +1,51 @@
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+/**
+ * Remote version of GatewaySenderEvent Processor
+ * @author skumar
+ *
+ */
+public class RemoteConcurrentParallelGatewaySenderEventProcessor extends ConcurrentParallelGatewaySenderEventProcessor{
+  
+  public RemoteConcurrentParallelGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    super(sender);
+  }
+
+  @Override
+  protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
+    processors = new RemoteParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
+    if (logger.isDebugEnabled()) {
+      logger.debug("Creating GatewaySenderEventProcessor");
+    }
+    for (int i = 0; i < sender.getDispatcherThreads(); i++) {
+      processors[i] = new RemoteParallelGatewaySenderEventProcessor(sender,
+          targetRs, i, sender.getDispatcherThreads());
+    }
+  }
+  
+  @Override
+  protected void rebalance() {
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    long startTime = statistics.startLoadBalance();
+    try {
+      for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
+        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)parallelProcessor.getDispatcher();
+        if (remoteDispatcher.isConnectedToRemote()) {
+          remoteDispatcher.stopAckReaderThread();
+          remoteDispatcher.destroyConnection();
+        }
+      }
+    } finally {
+      statistics.endLoadBalance(startTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..f002eae
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
@@ -0,0 +1,106 @@
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySenderEventProcessor {
+  private static final Logger logger = LogService.getLogger();
+  
+  protected RemoteParallelGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    super(sender);
+  }
+  
+  /**
+   * use in concurrent scenario where queue is to be shared among all the processors.
+   */
+  protected RemoteParallelGatewaySenderEventProcessor(AbstractGatewaySender sender,  Set<Region> userRegions, int id, int nDispatcher) {
+    super(sender,  userRegions, id, nDispatcher);
+  }
+  
+  @Override
+  protected void rebalance() {
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    long startTime = statistics.startLoadBalance();
+    try {
+      if (this.dispatcher.isRemoteDispatcher()) {
+        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) this.dispatcher;
+        if (remoteDispatcher.isConnectedToRemote()) {
+          remoteDispatcher.stopAckReaderThread();
+          remoteDispatcher.destroyConnection();
+        }
+      }
+    } finally {
+      statistics.endLoadBalance(startTime);
+    }
+  }
+  
+  public void initializeEventDispatcher() {
+    if (logger.isDebugEnabled()) {
+      logger.debug(" Creating the GatewayEventRemoteDispatcher");
+    }
+    if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+      this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
+    }
+  }
+  
+  /**
+   * Returns if corresponding receiver WAN site of this GatewaySender has
+   * GemfireVersion > 7.0.1
+   * 
+   * @param disp
+   * @return true if remote site Gemfire Version is >= 7.0.1
+   */
+  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
+      throws GatewaySenderException {
+      try {
+        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) disp;
+        // This will create a new connection if no batch has been sent till
+        // now.
+        Connection conn = remoteDispatcher.getConnection(false);
+        if (conn != null) {
+          short remoteSiteVersion = conn.getWanSiteVersion();
+          if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
+            return true;
+          }
+        }
+      } catch (GatewaySenderException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException
+            || e instanceof GatewaySenderConfigurationException
+            || cause instanceof ConnectionDestroyedException) {
+          try {
+            int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
+            if (logger.isDebugEnabled()) {
+              logger.debug("Sleeping for {} milliseconds", sleepInterval);
+            }
+            Thread.sleep(sleepInterval);
+          } catch (InterruptedException ie) {
+            // log the exception
+            if (logger.isDebugEnabled()){
+              logger.debug(ie.getMessage(), ie);
+            }
+          }
+        }
+        throw e;
+      }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..440930d
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
@@ -0,0 +1,29 @@
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RemoteConcurrentSerialGatewaySenderEventProcessor extends
+    ConcurrentSerialGatewaySenderEventProcessor {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  public RemoteConcurrentSerialGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    super(sender);
+  }
+
+  @Override
+  protected void initializeMessageQueue(String id) {
+    for (int i = 0; i < sender.getDispatcherThreads(); i++) {
+      processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id
+          + "." + i));
+      if (logger.isDebugEnabled()) {
+        logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..ea02870
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -0,0 +1,34 @@
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RemoteSerialGatewaySenderEventProcessor extends
+    SerialGatewaySenderEventProcessor {
+
+  private static final Logger logger = LogService.getLogger();
+  public RemoteSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
+      String id) {
+    super(sender, id);
+  }
+
+  public void initializeEventDispatcher() {
+    if (logger.isDebugEnabled()) {
+      logger.debug(" Creating the GatewayEventRemoteDispatcher");
+    }
+    // In case of serial there is a way to create gatewaysender and attach
+    // asynceventlistener. Not sure of the use-case but there are dunit tests
+    // To make them passuncommenting the below condition
+    if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+      this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
+    }else{
+      this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
new file mode 100644
index 0000000..d69d747
--- /dev/null
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -0,0 +1,253 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * ========================================================================
+ */
+
+package com.gemstone.gemfire.internal.cache.wan.serial;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * @since 7.0
+ *
+ */
+public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+      "Remote Site Discovery Logger Group", logger);
+
+  public SerialGatewaySenderImpl(){
+    super();
+    this.isParallel = false;
+  }
+  public SerialGatewaySenderImpl(Cache cache,
+      GatewaySenderAttributes attrs) {
+    super(cache, attrs);
+  }
+  
+  @Override
+  public void start() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Starting gatewaySender : {}", this);
+    }
+    
+    this.lifeCycleLock.writeLock().lock();
+    try {
+      if (isRunning()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+        return;
+      }
+      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+            .getConfig().getLocators();
+        if (locators.length() == 0) {
+          throw new GatewaySenderConfigurationException(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString());
+        }
+      }
+      getSenderAdvisor().initDLockService();
+      if (!isPrimary()) {
+        if (getSenderAdvisor().volunteerForPrimary()) {
+          getSenderAdvisor().makePrimary();
+        } else {
+          getSenderAdvisor().makeSecondary();
+        }
+      }
+      if (getDispatcherThreads() > 1) {
+        eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor(
+            SerialGatewaySenderImpl.this);
+      } else {
+        eventProcessor = new RemoteSerialGatewaySenderEventProcessor(
+            SerialGatewaySenderImpl.this, getId());
+      }
+      eventProcessor.start();
+      waitForRunningStatus();
+      this.startTime = System.currentTimeMillis();
+      
+      //Only notify the type registry if this is a WAN gateway queue
+      if(!isAsyncEventQueue()) {
+        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+      }
+      new UpdateAttributesProcessor(this).distribute(false);
+
+      
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+      
+      logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0, this));
+  
+      enqueueTempEvents();
+    } finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+  }
+  
+  @Override
+  public void stop() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Stopping Gateway Sender : {}", this);
+    }
+    this.lifeCycleLock.writeLock().lock();
+    try {
+      // Stop the dispatcher
+      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+      if (ev != null && !ev.isStopped()) {
+        ev.stopProcessing();
+      }
+      this.eventProcessor = null;
+
+      // Stop the proxy (after the dispatcher, so the socket is still
+      // alive until after the dispatcher has stopped)
+      stompProxyDead();
+
+      // Close the listeners
+      for (AsyncEventListener listener : this.listeners) {
+        listener.close();
+      }
+      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+      
+      clearTempEventsAfterSenderStopped();
+    } finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+    if (this.isPrimary()) {
+      try {
+        DistributedLockService
+            .destroy(getSenderAdvisor().getDLockServiceName());
+      } catch (IllegalArgumentException e) {
+        // service not found... ignore
+      }
+    }
+    if (getQueues() != null && !getQueues().isEmpty()) {
+      for (RegionQueue q : getQueues()) {
+        ((SerialGatewaySenderQueue)q).cleanUp();
+      }
+    }
+    this.setIsPrimary(false);
+    new UpdateAttributesProcessor(this).distribute(false);
+    Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread();
+    if (lockObtainingThread != null && lockObtainingThread.isAlive()) {
+      // wait a while for thread to terminate
+      try {
+        lockObtainingThread.join(3000);
+      } catch (InterruptedException ex) {
+        // we allowed our join to be canceled
+        // reset interrupt bit so this thread knows it has been interrupted
+        Thread.currentThread().interrupt();
+      }
+      if (lockObtainingThread.isAlive()) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP));
+      }
+    }
+    
+    InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+        .getDistributedSystem();
+    system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("SerialGatewaySender{");
+    sb.append("id=" + getId());
+    sb.append(",remoteDsId="+ getRemoteDSId());
+    sb.append(",isRunning ="+ isRunning());
+    sb.append(",isPrimary ="+ isPrimary());
+    sb.append("}");
+    return sb.toString();
+  }
+ 
+  @Override
+  public void fillInProfile(Profile profile) {
+    assert profile instanceof GatewaySenderProfile;
+    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+    pf.Id = getId();
+    pf.startTime = getStartTime();
+    pf.remoteDSId = getRemoteDSId();
+    pf.isRunning = isRunning();
+    pf.isPrimary = isPrimary();
+    pf.isParallel = false;
+    pf.isBatchConflationEnabled = isBatchConflationEnabled();
+    pf.isPersistenceEnabled = isPersistenceEnabled();
+    pf.alertThreshold = getAlertThreshold();
+    pf.manualStart = isManualStart();
+    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+      pf.eventFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+      pf.transFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (AsyncEventListener listener : getAsyncEventListeners()) {
+      pf.senderEventListenerClassNames.add(listener.getClass().getName());
+    }
+    pf.isDiskSynchronous = isDiskSynchronous();
+    pf.dispatcherThreads = getDispatcherThreads();
+    pf.orderPolicy = getOrderPolicy();
+    pf.serverLocation = this.getServerLocation(); 
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+   */
+  @Override
+  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+    EventID originalEventId = clonedEvent.getEventId();
+    long originalThreadId = originalEventId.getThreadID();
+    long newThreadId = originalThreadId;
+    if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) {
+      // This thread id has already been converted. Do nothing.
+    } else {
+      newThreadId = ThreadIdentifier
+        .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId,
+            getEventIdIndex());
+    }
+    EventID newEventId = new EventID(originalEventId.getMembershipID(),
+        newThreadId, originalEventId.getSequenceID());
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}",
+          this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/resources/.keepme
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/resources/.keepme b/gemfire-wan/src/main/resources/.keepme
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener b/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener
new file mode 100644
index 0000000..ce96f96
--- /dev/null
+++ b/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener
@@ -0,0 +1 @@
+com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListenerImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.internal.cache.wan.spi.WANFactory
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.internal.cache.wan.spi.WANFactory b/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.internal.cache.wan.spi.WANFactory
new file mode 100644
index 0000000..4d47e35
--- /dev/null
+++ b/gemfire-wan/src/main/resources/META-INF/services/com.gemstone.gemfire.internal.cache.wan.spi.WANFactory
@@ -0,0 +1 @@
+com.gemstone.gemfire.cache.client.internal.locator.wan.WANFactoryImpl


Mime
View raw message