geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject [geode] 01/01: GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender
Date Wed, 09 May 2018 23:38:10 GMT
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5087
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9dcec3010c454460e00f2792bd4c5a0d2e943ee8
Author: zhouxh <gzhou@pivotal.io>
AuthorDate: Thu May 3 17:15:33 2018 -0700

    GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender
---
 .../internal/ParallelAsyncEventQueueImpl.java      |  2 +-
 .../internal/SerialAsyncEventQueueImpl.java        |  2 +-
 .../internal/cache/wan/AbstractGatewaySender.java  | 68 +++++++++++++---------
 .../wan/AbstractGatewaySenderEventProcessor.java   | 33 +----------
 ...currentParallelGatewaySenderEventProcessor.java | 32 ++++++++++
 .../ParallelGatewaySenderEventProcessor.java       |  6 ++
 .../cache/wan/serial/BatchDestroyOperation.java    | 25 +++++++-
 ...oncurrentSerialGatewaySenderEventProcessor.java | 64 ++++++++++++--------
 .../serial/SerialGatewaySenderEventProcessor.java  | 47 +++++++++++++--
 .../xmlcache/ParallelAsyncEventQueueCreation.java  |  2 +-
 .../xmlcache/ParallelGatewaySenderCreation.java    |  2 +-
 .../xmlcache/SerialAsyncEventQueueCreation.java    |  2 +-
 .../xmlcache/SerialGatewaySenderCreation.java      |  2 +-
 .../wan/parallel/ParallelGatewaySenderImpl.java    |  2 +-
 .../cache/wan/serial/SerialGatewaySenderImpl.java  |  6 +-
 .../SerialGatewaySenderOperationsDUnitTest.java    |  2 -
 16 files changed, 196 insertions(+), 101 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 538b65a..8e2e4e4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -168,7 +168,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender
{
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     int bucketId = -1;
     // merged from 42004
     if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 9e0239d..400126d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -225,7 +225,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
    * internal.cache.EntryEventImpl)
    */
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     EventID originalEventId = clonedEvent.getEventId();
     long originalThreadId = originalEventId.getThreadID();
     long newThreadId = originalThreadId;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 1027582..3b55989 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -173,6 +173,9 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
 
   protected volatile ConcurrentLinkedQueue<TmpQueueEvent> tmpQueuedEvents =
       new ConcurrentLinkedQueue<>();
+
+  protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents =
+      new ConcurrentLinkedQueue<>();
   /**
    * The number of seconds to wait before stopping the GatewaySender. Default is 0 seconds.
    */
@@ -844,40 +847,43 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
-    // If this gateway is not running, return
-    if (!isRunning()) {
-      if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender queue:" + event);
-      }
-      if (this.eventProcessor != null) {
-        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
-      }
-      return;
-    }
-
-    final GatewaySenderStats stats = getStatistics();
-    stats.incEventsReceived();
-
-    if (!checkForDistribution(event, stats)) {
-      stats.incEventsNotQueued();
-      return;
-    }
-
-    // this filter is defined by Asif which exist in old wan too. new wan has
-    // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
-    // not considering this filter
-    if (!this.filter.enqueueEvent(event)) {
-      stats.incEventsFiltered();
-      return;
-    }
     // released by this method or transfers ownership to TmpQueueEvent
     @Released
     EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
     boolean freeClonedEvent = true;
     try {
 
-      Region region = event.getRegion();
+      // If this gateway is not running, return
+      if (!isRunning()) {
+        if (this.isPrimary()) {
+          tmpDroppedEvents.add(clonedEvent);
+          if (isDebugEnabled) {
+            logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent);
+          }
+        }
+        if (isDebugEnabled) {
+          logger.debug("Returning back without putting into the gateway sender queue:" +
event);
+        }
+        return;
+      }
+
+      final GatewaySenderStats stats = getStatistics();
+      stats.incEventsReceived();
+
+      if (!checkForDistribution(event, stats)) {
+        stats.incEventsNotQueued();
+        return;
+      }
 
+      // this filter is defined by Asif which exist in old wan too. new wan has
+      // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
+      // not considering this filter
+      if (!this.filter.enqueueEvent(event)) {
+        stats.incEventsFiltered();
+        return;
+      }
+
+      // start to distribute
       setModifiedEventId(clonedEvent);
       Object callbackArg = clonedEvent.getRawCallbackArgument();
 
@@ -1024,6 +1030,12 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
    */
   public void enqueueTempEvents() {
     if (this.eventProcessor != null) {// Fix for defect #47308
+      // process tmpDroppedEvents
+      EntryEventImpl droppedEvent = null;
+      while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
+        this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
+      }
+
       TmpQueueEvent nextEvent = null;
       final GatewaySenderStats stats = getStatistics();
       try {
@@ -1224,7 +1236,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
     return region;
   }
 
-  protected abstract void setModifiedEventId(EntryEventImpl clonedEvent);
+  public abstract void setModifiedEventId(EntryEventImpl clonedEvent);
 
   public static class DefaultGatewayEventFilter
       implements org.apache.geode.internal.cache.GatewayEventFilter {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 2ce06c6..89fa586 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,7 +33,6 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -279,36 +277,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread
{
     return this.queue.size();
   }
 
-  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
-    if (queue == null) {
-      return;
-    }
-    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue)
queue;
-      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
-      if (prQ == null) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
-              + " is not created yet.");
-        }
-        return;
-      }
-      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
-      long shadowKey = event.getTailKey();
-
-      ParallelGatewaySenderQueue pgsq =
-          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
-      boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-      if (isPrimary) {
-        pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
-        this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
-        if (logger.isDebugEnabled()) {
-          logger.debug("register dropped event for primary queue. BucketId is " + bucketId
-              + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
-        }
-      }
-    }
-  }
+  protected abstract void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent);
 
   /**
    * @return the sender
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 54b7034..6b8cce1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -32,12 +32,15 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -138,6 +141,35 @@ public class ConcurrentParallelGatewaySenderEventProcessor
   }
 
   @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    if (queue == null) {
+      return;
+    }
+    ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+    PartitionedRegion prQ = cpgsq.getRegion(droppedEvent.getRegion().getFullPath());
+    if (prQ == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("shadow partitioned region " + droppedEvent.getRegion().getFullPath()
+            + " is not created yet.");
+      }
+      return;
+    }
+    int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) droppedEvent);
+    long shadowKey = droppedEvent.getTailKey();
+
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+    boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+    if (isPrimary) {
+      pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
+      this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
+      if (logger.isDebugEnabled()) {
+        logger.debug("register dropped event for primary queue. BucketId is " + bucketId
+            + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+      }
+    }
+  }
+
+  @Override
   public void run() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 5715a35..77811c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -149,6 +149,12 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
     }
   }
 
+  @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    logger.info("ParallelGatewaySenderEventProcessor should not process dropped event {}",
+        droppedEvent);
+  }
+
   public void clear(PartitionedRegion pr, int bucketId) {
     ((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
index debb005..0744561 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -102,7 +102,7 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
         }
 
         // Optimized way
-        for (long k = (Long) this.key; k <= this.tailKey; k++) {
+        for (long k = (Long) this.key; k <= this.tailKey && this.tailKey != -1;
k++) {
           try {
             for (GatewayEventFilter filter : rgn.getSerialGatewaySender()
                 .getGatewayEventFilters()) {
@@ -124,6 +124,29 @@ public class BatchDestroyOperation extends DistributedCacheOperation
{
             }
           }
         }
+
+        // destroy dropped event from unprocessedKeys
+        if (this.tailKey == -1) {
+          SerialGatewaySenderEventProcessor ep = null;
+          int index = ((Long) this.key).intValue();
+          if (index == -1) {
+            // this is SerialGatewaySenderEventProcessor
+            ep = (SerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender()
+                .getEventProcessor();
+          } else {
+            ConcurrentSerialGatewaySenderEventProcessor csgep =
+                (ConcurrentSerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender()
+                    .getEventProcessor();
+            ep = csgep.processors.get(index);
+          }
+          boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId());
+          if (removed) {
+            if (isDebugEnabled) {
+              logger.debug("Removed a dropped event {} from unprocessedEvents.",
+                  (EntryEventImpl) event);
+            }
+          }
+        }
         this.appliedOperation = true;
       } catch (CacheWriterException e) {
         throw new Error(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ec01fd9..8ec6ce1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -109,6 +109,35 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   }
 
+  public void setModifiedEventId(EntryEventImpl clonedEvent, int index) {
+    EventID originalEventId = clonedEvent.getEventId();
+    if (logger.isDebugEnabled()) {
+      logger.debug("The original EventId is {}", originalEventId);
+    }
+    // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
+    // generating threadId by the algorithm explained above used to clash with
+    // fakeThreadId generated by putAll
+    // below is new way to generate threadId so that it doesn't clash with
+    // any.
+    long newThreadId =
+        ThreadIdentifier.createFakeThreadIDForParallelGateway(index, originalEventId.getThreadID(),
+            0 /*
+               * gateway sender event id index has already been applied in
+               * SerialGatewaySenderImpl.setModifiedEventId
+               */);
+    EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId,
+        originalEventId.getSequenceID());
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: Generated event id for event with key={}, index={}, original event id={},
threadId={}, new event id={}, newThreadId={}"
+              + ":index=" + this.sender.getEventIdIndex(),
+          this, clonedEvent.getKey(), index, originalEventId,
+          ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId,
+          ThreadIdentifier.toDisplayString(newThreadId));
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
   public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
       int index) throws CacheException, IOException {
     // Get the appropriate gateway
@@ -121,30 +150,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       @Released
       EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event);
       try {
-        EventID originalEventId = clonedEvent.getEventId();
-        if (logger.isDebugEnabled()) {
-          logger.debug("The original EventId is {}", originalEventId);
-        }
-        // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
-        // generating threadId by the algorithm explained above used to clash with
-        // fakeThreadId generated by putAll
-        // below is new way to generate threadId so that it doesn't clash with
-        // any.
-        long newThreadId = ThreadIdentifier.createFakeThreadIDForParallelGateway(index,
-            originalEventId.getThreadID(),
-            0 /*
-               * gateway sender event id index has already been applied in
-               * SerialGatewaySenderImpl.setModifiedEventId
-               */);
-        EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId,
-            originalEventId.getSequenceID());
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Generated event id for event with key={}, index={}, original event id={},
threadId={}, new event id={}, newThreadId={}",
-              this, event.getKey(), index, originalEventId, originalEventId.getThreadID(),
-              newEventId, newThreadId);
-        }
-        clonedEvent.setEventId(newEventId);
+        setModifiedEventId(clonedEvent, index);
         serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue);
       } finally {
         clonedEvent.release();
@@ -375,6 +381,16 @@ public class ConcurrentSerialGatewaySenderEventProcessor
   }
 
   @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    this.getSender().setModifiedEventId(droppedEvent);
+    // modified event again for concurrent SGSEP
+    int index = Math.abs(getHashCode(((EntryEventImpl) droppedEvent)) % this.processors.size());
+    setModifiedEventId(droppedEvent, index);
+
+    this.processors.get(index).sendBatchDestroyOperationForDroppedEvent(droppedEvent, index);
+  }
+
+  @Override
   protected void enqueueEvent(GatewayQueueEvent event) {
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
       serialProcessor.enqueueEvent(event);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 3fa4d6a..39609c7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -610,7 +611,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
       }
       my_executor.execute(new Runnable() {
         public void run() {
-          basicHandlePrimaryDestroy(gatewayEvent);
+          basicHandlePrimaryDestroy(gatewayEvent.getEventId());
         }
       });
     }
@@ -620,23 +621,25 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
    * Just remove the event from the unprocessed events map if it is present. This method
added to
    * fix bug 37603
    */
-  protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) {
+  protected boolean basicHandlePrimaryDestroy(final EventID eventId) {
     if (this.sender.isPrimary()) {
       // no need to do anything if we have become the primary
-      return;
+      return false;
     }
     GatewaySenderStats statistics = this.sender.getStatistics();
     // Get the event from the map
     synchronized (unprocessedEventsLock) {
       if (this.unprocessedEvents == null)
-        return;
+        return false;
       // now we can safely use the unprocessedEvents field
-      EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
+      EventWrapper ew = this.unprocessedEvents.remove(eventId);
       if (ew != null) {
         ew.event.release();
         statistics.incUnprocessedEventsRemovedByPrimary();
+        return true;
       }
     }
+    return false;
   }
 
   protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
@@ -865,4 +868,38 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
     // @TODO This API hasn't been implemented yet
     throw new UnsupportedOperationException();
   }
+
+  public void sendBatchDestroyOperationForDroppedEvent(EntryEventImpl dropEvent, int index)
{
+    EntryEventImpl destroyEvent =
+        EntryEventImpl.create((LocalRegion) this.queue.getRegion(), Operation.DESTROY, (long)
index,
+            null/* newValue */, null, false, sender.getCache().getMyId());
+    destroyEvent.setEventId(dropEvent.getEventId());
+    destroyEvent.disallowOffHeapValues();
+    destroyEvent.setTailKey(-1L);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for
event {}",
+          destroyEvent);
+    }
+
+    try {
+      BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent);
+      op.distribute();
+      if (logger.isDebugEnabled()) {
+        logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent);
+      }
+    } catch (Exception ignore) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Exception in sending dropped event could be ignored in order not to interrupt
sender starting",
+            ignore);
+      }
+    }
+  }
+
+  @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+    this.getSender().setModifiedEventId(droppedEvent);
+    sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
index 6f8efa8..4686b67 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
@@ -87,5 +87,5 @@ public class ParallelAsyncEventQueueCreation extends AbstractGatewaySender
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 5b025b5..257ee75 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -90,7 +90,7 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender
impleme
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 
   protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
     throw new UnsupportedOperationException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
index ce71c54..cd06661 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
@@ -86,5 +86,5 @@ public class SerialAsyncEventQueueCreation extends AbstractGatewaySender
impleme
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index 80c04de..b0766ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -87,7 +87,7 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 
   protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
     throw new UnsupportedOperationException();
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index d023704..f565426 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -167,7 +167,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender
{
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     int bucketId = -1;
     // merged from 42004
     if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d964253..ecca896 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -211,7 +211,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender
{
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     EventID originalEventId = clonedEvent.getEventId();
     long originalThreadId = originalEventId.getThreadID();
     long newThreadId = originalThreadId;
@@ -226,7 +226,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender
{
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Generated event id for event with key={}, original event id={}, originalThreadId={},
new event id={}, newThreadId={}",
-          this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+          this, clonedEvent.getKey(), originalEventId,
+          ThreadIdentifier.toDisplayString(originalThreadId), newEventId,
+          ThreadIdentifier.toDisplayString(newThreadId));
     }
     clonedEvent.setEventId(newEventId);
   }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index caa357e..4993f24 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({DistributedTest.class, WanTest.class})
@@ -266,7 +265,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase
{
     vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
   }
 
-  @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
   @Test
   public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.

Mime
View raw message