geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [2/4] geode git commit: GEODE-2860: Refactor use of EventTracker
Date Wed, 12 Jul 2017 18:50:12 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java
new file mode 100644
index 0000000..9bb450d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.internal.cache.versions.VersionTag;
+
+/**
+ * A sequence number tracker to keep events from clients from being re-applied to the cache if
+ * they've already been seen.
+ *
+ * @since GemFire 5.5
+ */
+public class EventSequenceNumberHolder implements DataSerializable {
+  private static final long serialVersionUID = 8137262960763308046L;
+
+  /**
+   * event sequence number. These
+   */
+  private long lastSequenceNumber = -1;
+
+  /**
+   * millisecond timestamp
+   */
+  private transient long endOfLifeTimestamp;
+
+  /**
+   * whether this entry is being removed
+   */
+  private transient boolean removed;
+
+  /**
+   * version tag, if any, for the operation
+   */
+  private VersionTag versionTag;
+
+  // for debugging
+  // transient Exception context;
+
+  EventSequenceNumberHolder(long id, VersionTag versionTag) {
+    this.lastSequenceNumber = id;
+    this.versionTag = versionTag;
+  }
+
+  public EventSequenceNumberHolder() {}
+
+  public long getLastSequenceNumber() {
+    return lastSequenceNumber;
+  }
+
+  public VersionTag getVersionTag() {
+    return versionTag;
+  }
+
+  public boolean isRemoved() {
+    return removed;
+  }
+
+  void setRemoved(boolean removed) {
+    this.removed = removed;
+  }
+
+  void setEndOfLifeTimestamp(long endOfLifeTimestamp) {
+    this.endOfLifeTimestamp = endOfLifeTimestamp;
+  }
+
+  void setVersionTag(VersionTag versionTag) {
+    this.versionTag = versionTag;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append("seqNo").append(this.lastSequenceNumber);
+    if (this.versionTag != null) {
+      result.append(",").append(this.versionTag);
+    }
+    return result.toString();
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    lastSequenceNumber = in.readLong();
+    versionTag = (VersionTag) DataSerializer.readObject(in);
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    out.writeLong(lastSequenceNumber);
+    DataSerializer.writeObject(versionTag, out);
+  }
+
+  public synchronized boolean expire(long now, long expirationTime) {
+    if (endOfLifeTimestamp == 0) {
+      endOfLifeTimestamp = now; // a new holder - start the timer
+    }
+    boolean expire = false;
+    if (endOfLifeTimestamp <= expirationTime) {
+      removed = true;
+      lastSequenceNumber = -1;
+      expire = true;
+    }
+    return expire;
+  }
+
+  public void setLastSequenceNumber(long lastSequenceNumber) {
+    this.lastSequenceNumber = lastSequenceNumber;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java
new file mode 100644
index 0000000..43a0458
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCacheEvent;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.versions.VersionTag;
+
+/**
+ * EventTracker tracks the last sequence number for a particular memberID:threadID. It is used to
+ * avoid replaying events in client/server and partitioned-region configurations.
+ *
+ * @since GemFire 6.0
+ */
+public interface EventTracker {
+  /** start this event tracker */
+  void start();
+
+  /** stop this event tracker */
+  void stop();
+
+  /**
+   * retrieve a deep copy of the state of the event tracker. Synchronization is not used while
+   * copying the tracker's state.
+   */
+  Map<ThreadIdentifier, EventSequenceNumberHolder> getState();
+
+  /**
+   * record the given state in the tracker.
+   *
+   * @param provider the member that provided this state
+   * @param state a Map obtained from getState();
+   */
+  void recordState(InternalDistributedMember provider,
+      Map<ThreadIdentifier, EventSequenceNumberHolder> state);
+
+  /**
+   * Use this method to ensure that the tracker is put in an initialized state
+   */
+  void setInitialized();
+
+  /**
+   * Wait for the tracker to finish being initialized
+   */
+  void waitOnInitialization() throws InterruptedException;
+
+  /** record the event's threadid/sequenceid to prevent replay */
+  void recordEvent(InternalCacheEvent event);
+
+  /**
+   * Determines if an event has already been seen by the tracker
+   *
+   * @param event The event to determine if it has been seen by the tracker already
+   * @return if the event provided has already been seen
+   */
+  boolean hasSeenEvent(InternalCacheEvent event);
+
+  /**
+   * Determines if an event has already been seen by the tracker
+   *
+   * @param eventID The id of the event to determine if it has been seen by the tracker already
+   * @return if the event provided has already been seen
+   */
+  boolean hasSeenEvent(EventID eventID);
+
+  /**
+   * Determines if an event has already been seen by the tracker
+   *
+   * @param eventID The id of the event to determine if it has been seen by the tracker already
+   * @param tagHolder Event to update version tag with that of eventID, if event was seen before
+   * @return if the event provided has already been seen
+   */
+  boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder);
+
+  VersionTag findVersionTagForSequence(EventID eventID);
+
+  VersionTag findVersionTagForBulkOp(EventID eventID);
+
+  /**
+   * The name of the event tracker for logging purposes
+   *
+   * @return the name of the tracker
+   */
+  String getName();
+
+  ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags();
+
+  ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents();
+
+  /**
+   * A routine to provide synchronization running based on <memberShipID, threadID> of the
+   * requesting client
+   *
+   * @param r - a Runnable to wrap the processing of the bulk op
+   * @param eventID - the base event ID of the bulk op
+   *
+   * @since GemFire 5.7
+   */
+  void syncBulkOp(Runnable r, EventID eventID, boolean partOfTransaction);
+
+  /**
+   * Called when a new bulkOp is started on the local region. Used to clear event tracker state from
+   * the last bulkOp.
+   */
+  void recordBulkOpStart(EventID eventID, ThreadIdentifier tid);
+
+  /**
+   * @return the initialization state of the tracker
+   */
+  boolean isInitialized();
+
+  /**
+   * @param mbr the member in question
+   * @return true if the given member provided the initial image event state for this tracker
+   */
+  boolean isInitialImageProvider(DistributedMember mbr);
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java
new file mode 100644
index 0000000..dff80df
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.geode.internal.SystemTimer;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+
+public class EventTrackerExpiryTask extends SystemTimer.SystemTimerTask {
+
+  private final long lifetimeInMillis;
+  private final List<EventTracker> trackers = new LinkedList<>();
+  private final boolean traceEnabled = logger.isTraceEnabled();
+
+  public EventTrackerExpiryTask(long lifetimeInMillis) {
+    this.lifetimeInMillis = lifetimeInMillis;
+  }
+
+  void addTracker(EventTracker tracker) {
+    synchronized (trackers) {
+      trackers.add(tracker);
+    }
+  }
+
+  void removeTracker(EventTracker tracker) {
+    synchronized (trackers) {
+      trackers.remove(tracker);
+    }
+  }
+
+  int getNumberOfTrackers() {
+    return trackers.size();
+  }
+
+  @Override
+  public void run2() {
+    long now = System.currentTimeMillis();
+    long expirationTime = now - lifetimeInMillis;
+    synchronized (trackers) {
+      for (EventTracker tracker : trackers) {
+        if (traceEnabled) {
+          logger.trace("{} sweeper: starting", tracker.getName());
+        }
+        removeExpiredSequenceTracker(tracker, now, expirationTime);
+        removeExpiredBulkOperations(tracker, now, expirationTime);
+
+        if (traceEnabled) {
+          logger.trace("{} sweeper: done", tracker.getName());
+        }
+      }
+    }
+  }
+
+  private void removeExpiredSequenceTracker(EventTracker tracker, long now, long expirationTime) {
+    for (Iterator<Map.Entry<ThreadIdentifier, EventSequenceNumberHolder>> entryIterator =
+        tracker.getRecordedEvents().entrySet().iterator(); entryIterator.hasNext();) {
+      Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry = entryIterator.next();
+      EventSequenceNumberHolder evh = entry.getValue();
+      if (evh.expire(now, expirationTime)) {
+        if (traceEnabled) {
+          logger.trace("{} sweeper: removing {}", tracker.getName(), entry.getKey());
+        }
+        entryIterator.remove();
+      }
+    }
+  }
+
+  private void removeExpiredBulkOperations(EventTracker tracker, long now, long expirationTime) {
+    for (Iterator<Map.Entry<ThreadIdentifier, BulkOperationHolder>> entryIterator =
+        tracker.getRecordedBulkOpVersionTags().entrySet().iterator(); entryIterator.hasNext();) {
+      Map.Entry<ThreadIdentifier, BulkOperationHolder> entry = entryIterator.next();
+      BulkOperationHolder evh = entry.getValue();
+      if (evh.expire(now, expirationTime)) {
+        if (traceEnabled) {
+          logger.trace("{} sweeper: removing bulkOp {}", tracker.getName(), entry.getKey());
+        }
+        entryIterator.remove();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java
new file mode 100644
index 0000000..f5b1831
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCacheEvent;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.versions.VersionTag;
+
+public class NonDistributedEventTracker implements EventTracker {
+  private static final NonDistributedEventTracker INSTANCE = new NonDistributedEventTracker();
+
+  static final String NAME = "The NonDistributedEventTracker";
+
+  public static NonDistributedEventTracker getInstance() {
+    return INSTANCE;
+  }
+
+  private NonDistributedEventTracker() {
+    // private no arg constructor to enforce singleton pattern
+  }
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
+    return null;
+  }
+
+  @Override
+  public void recordState(InternalDistributedMember provider, Map state) {
+
+  }
+
+  @Override
+  public void recordEvent(InternalCacheEvent event) {
+
+  }
+
+  @Override
+  public boolean hasSeenEvent(InternalCacheEvent event) {
+    return false;
+  }
+
+  @Override
+  public void waitOnInitialization() throws InterruptedException {
+
+  }
+
+  @Override
+  public VersionTag findVersionTagForSequence(EventID eventId) {
+    return null;
+  }
+
+  @Override
+  public VersionTag findVersionTagForBulkOp(EventID eventId) {
+    return null;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean hasSeenEvent(EventID eventID) {
+    return false;
+  }
+
+  @Override
+  public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) {
+    return false;
+  }
+
+  @Override
+  public void recordBulkOpStart(EventID eventID, ThreadIdentifier membershipID) {
+
+  }
+
+  @Override
+  public void syncBulkOp(Runnable task, EventID eventId, boolean partOfTransaction) {
+    task.run();
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  public void setInitialized() {
+
+  }
+
+  @Override
+  public boolean isInitialImageProvider(DistributedMember mbr) {
+    return false;
+  }
+
+  @Override
+  public ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags() {
+    return null;
+  }
+
+  @Override
+  public ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index cc78cca..1f47897 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -211,9 +211,9 @@ public abstract class BaseCommand implements Command {
     LocalRegion r = clientEvent.getRegion();
     VersionTag tag;
     if (clientEvent.getVersionTag() != null && clientEvent.getVersionTag().isGatewayTag()) {
-      tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId());
+      tag = r.findVersionTagForEvent(clientEvent.getEventId());
     } else {
-      tag = r.findVersionTagForClientEvent(clientEvent.getEventId());
+      tag = r.findVersionTagForEvent(clientEvent.getEventId());
     }
     if (tag == null) {
       if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6fa17a4..9bad91a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -52,11 +52,13 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.Conflatable;
+import org.apache.geode.internal.cache.event.EventTracker;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.event.NonDistributedEventTracker;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.Token;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
@@ -1188,7 +1190,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 
     // @override event tracker not needed for this type of region
     @Override
-    public void createEventTracker() {}
+    public EventTracker createEventTracker() {
+      return NonDistributedEventTracker.getInstance();
+    }
 
     @Override
     protected boolean shouldNotifyBridgeClients() {

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 61f91a0..db5f7ca 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -88,7 +88,7 @@ import org.apache.geode.internal.cache.DiskStoreFactoryImpl;
 import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.DiskStoreMonitor;
 import org.apache.geode.internal.cache.DistributedRegion;
-import org.apache.geode.internal.cache.EventTracker.ExpiryTask;
+import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
 import org.apache.geode.internal.cache.ExpirationScheduler;
 import org.apache.geode.internal.cache.FilterProfile;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -1670,7 +1670,7 @@ public class CacheCreation implements InternalCache {
   }
 
   @Override
-  public ExpiryTask getEventTrackerTask() {
+  public EventTrackerExpiryTask getEventTrackerTask() {
     throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index d76ff1b..eaaa3aa 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -20,6 +20,7 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.*;
 
+import java.io.IOException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,6 +50,9 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
   protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
       RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache) {
     BucketRegion br = new BucketRegion("testRegion", ra, null, cache, ira);
+    // it is necessary to set the event tracker to initialized, since initialize() in not being
+    // called on the instantiated region
+    br.getEventTracker().setInitialized();
 
     // since br is a real bucket region object, we need to tell mockito to monitor it
     br = spy(br);

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index ce21c67..78cdd84 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -27,7 +27,8 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.EventTracker.BulkOpHolder;
+import org.apache.geode.internal.cache.event.BulkOperationHolder;
+import org.apache.geode.internal.cache.event.EventTracker;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.versions.VersionTag;
@@ -112,7 +113,6 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
     DistributedRegion region = prepare(true, true);
     DistributedMember member = mock(DistributedMember.class);
     ClientProxyMembershipID memberId = mock(ClientProxyMembershipID.class);
-    doReturn(false).when(region).isUsedForPartitionedRegionBucket();
 
     byte[] memId = {1, 2, 3};
     long threadId = 1;
@@ -124,8 +124,9 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
     recordPutAllEvents(region, memId, threadId, skipCallbacks, member, memberId, size);
     EventTracker eventTracker = region.getEventTracker();
 
-    ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags();
-    BulkOpHolder holder = map.get(tid);
+    ConcurrentMap<ThreadIdentifier, BulkOperationHolder> map =
+        eventTracker.getRecordedBulkOpVersionTags();
+    BulkOperationHolder holder = map.get(tid);
 
     EntryEventImpl retryEvent = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key1",
         "value1", null, false, member, !skipCallbacks, retryEventID);
@@ -133,7 +134,7 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
     retryEvent.setPutAllOperation(mock(DistributedPutAllOperation.class));
 
     region.hasSeenEvent(retryEvent);
-    assertTrue(retryEvent.getVersionTag().equals(holder.entryVersionTags.get(retryEventID)));
+    assertTrue(retryEvent.getVersionTag().equals(holder.getEntryVersionTags().get(retryEventID)));
   }
 
   protected void recordPutAllEvents(DistributedRegion region, byte[] memId, long threadId,

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
deleted file mode 100755
index 77c0998..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.ClientServerTestCase;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.EventTracker.BulkOpHolder;
-import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-/**
- * Tests <code>EventTracker</code> management.
- *
- * @since GemFire 6.5
- */
-@Category(DistributedTest.class)
-public class EventTrackerDUnitTest extends JUnit4CacheTestCase {
-
-  /** The port on which the <code>CacheServer</code> was started in this VM */
-  private static int cacheServerPort;
-
-  /** The <code>Cache</code>'s <code>ExpiryTask</code>'s ping interval */
-  private static final String MESSAGE_TRACKING_TIMEOUT = "5000";
-
-  @Override
-  public final void postTearDownCacheTestCase() throws Exception {
-    disconnectAllFromDS();
-  }
-
-  /**
-   * Tests <code>EventTracker</code> is created and destroyed when a <code>Region</code> is created
-   * and destroyed.
-   */
-  @Test
-  public void testEventTrackerCreateDestroy() throws CacheException {
-    // Verify the Cache's ExpiryTask contains no EventTrackers
-    GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
-    EventTracker.ExpiryTask expiryTask = cache.getEventTrackerTask();
-    assertNotNull(expiryTask);
-
-    // We start with 3 event trackers:
-    // one for the PDX registry region
-    // one for ManagementConstants.MONITORING_REGION
-    // one for ManagementConstants.NOTIFICATION_REGION
-    final int EXPECTED_TRACKERS = 3;
-    assertEquals(EXPECTED_TRACKERS, expiryTask.getNumberOfTrackers());
-
-    // Create a distributed Region
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    LocalRegion region = (LocalRegion) createRegion(getName(), factory.create());
-
-    // Verify an EventTracker is created and is empty
-    EventTracker eventTracker = region.getEventTracker();
-    assertNotNull(eventTracker);
-    Map eventState = region.getEventState();
-    assertNotNull(eventState);
-    assertEquals(0, eventState.size());
-
-    // Verify it and the root region's EventTracker are added to the Cache's ExpiryTask's trackers
-    assertEquals(EXPECTED_TRACKERS + 2, expiryTask.getNumberOfTrackers());
-
-    // Destroy the Region
-    region.destroyRegion();
-
-    // Verify the EventTracker is removed from the Cache's ExpiryTask's trackers
-    assertEquals(EXPECTED_TRACKERS + 1, expiryTask.getNumberOfTrackers());
-  }
-
-  /**
-   * Tests adding threads to an <code>EventTracker</code>.
-   */
-  @Test
-  public void testEventTrackerAddThreadIdentifier() throws CacheException {
-    Host host = Host.getHost(0);
-    VM serverVM = host.getVM(0);
-    VM clientVM = host.getVM(1);
-    final String regionName = getName();
-
-    // Create Region in the server and verify tracker is created
-    serverVM.invoke(new CacheSerializableRunnable("Create server") {
-      public void run2() throws CacheException {
-        // Create a distributed Region
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.DISTRIBUTED_ACK);
-        LocalRegion region = (LocalRegion) createRegion(regionName, factory.create());
-
-        // Verify an EventTracker is created
-        EventTracker eventTracker = region.getEventTracker();
-        assertNotNull(eventTracker);
-        try {
-          startCacheServer();
-        } catch (Exception ex) {
-          Assert.fail("While starting CacheServer", ex);
-        }
-      }
-    });
-
-    // Verify tracker in server contains no entries
-    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
-      public void run2() throws CacheException {
-        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
-        Map eventState = region.getEventState();
-        assertEquals(0, eventState.size());
-      }
-    });
-
-    // Create Create Region in the client
-    final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort());
-    final String hostName = NetworkUtils.getServerHostName(host);
-    clientVM.invoke(new CacheSerializableRunnable("Create client") {
-      public void run2() throws CacheException {
-        getCache();
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.LOCAL);
-        ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1,
-            null);
-        createRegion(regionName, factory.create());
-      }
-    });
-
-    // Do puts in the client
-    clientVM.invoke(new CacheSerializableRunnable("Do puts") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion().getSubregion(regionName);
-        for (int i = 0; i < 10; i++) {
-          region.put(i, i);
-        }
-      }
-    });
-
-    // Verify tracker in server contains an entry for client thread
-    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
-      public void run2() throws CacheException {
-        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
-        Map eventState = region.getEventState();
-        assertEquals(1, eventState.size());
-      }
-    });
-  }
-
-  /**
-   * Tests adding events to and removing events from an <code>EventTracker</code>.
-   */
-  @Test
-  public void testEventTrackerAddRemoveThreadIdentifier() throws CacheException {
-    Host host = Host.getHost(0);
-    VM serverVM = host.getVM(0);
-    VM clientVM = host.getVM(1);
-    final String regionName = getName();
-
-    // Create Region in the server and verify tracker is created
-    serverVM.invoke(new CacheSerializableRunnable("Create server") {
-      public void run2() throws CacheException {
-        // Set the message tracking timeout
-        System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout",
-            MESSAGE_TRACKING_TIMEOUT);
-
-        // Create a distributed Region
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.DISTRIBUTED_ACK);
-        LocalRegion region = (LocalRegion) createRegion(regionName, factory.create());
-
-        // Verify an EventTracker is created
-        EventTracker eventTracker = region.getEventTracker();
-        assertNotNull(eventTracker);
-        try {
-          startCacheServer();
-        } catch (Exception ex) {
-          Assert.fail("While starting CacheServer", ex);
-        }
-      }
-    });
-
-    // Verify tracker in server contains no entries
-    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
-      public void run2() throws CacheException {
-        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
-        Map eventState = region.getEventState();
-        assertEquals(0, eventState.size());
-      }
-    });
-
-    // Create Create Region in the client
-    final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort());
-    final String hostName = NetworkUtils.getServerHostName(host);
-    clientVM.invoke(new CacheSerializableRunnable("Create client") {
-      public void run2() throws CacheException {
-        getCache();
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.LOCAL);
-        ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1,
-            null);
-        createRegion(regionName, factory.create());
-      }
-    });
-
-    // Do puts in the client
-    clientVM.invoke(new CacheSerializableRunnable("Do puts") {
-      public void run2() throws CacheException {
-        Region region = getRootRegion().getSubregion(regionName);
-        for (int i = 0; i < 10; i++) {
-          region.put(i, i);
-        }
-      }
-    });
-
-    // Verify tracker in server
-    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
-      public void run2() throws CacheException {
-        // First verify it contains an entry
-        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
-        Map eventState = region.getEventState();
-        assertEquals(1, eventState.size());
-
-        // Pause for the message tracking timeout
-        int waitTime = Integer.parseInt(MESSAGE_TRACKING_TIMEOUT) * 3;
-        Wait.pause(waitTime);
-
-        // Verify the server no longer contains an entry
-        eventState = region.getEventState();
-        assertEquals(0, eventState.size());
-      }
-    });
-  }
-
-  /**
-   * Test to make sure we don't leak put all events in the event tracker after multiple putAlls
-   */
-  @Test
-  public void testPutAllHoldersInEventTracker() {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-
-    SerializableRunnable createRegion = new SerializableRunnable("createRegion") {
-
-      public void run() {
-        Cache cache = getCache();
-        RegionFactory<Object, Object> rf =
-            cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
-        PartitionAttributesFactory paf = new PartitionAttributesFactory();
-        paf.setRedundantCopies(1);
-        paf.setTotalNumBuckets(3);
-        rf.setPartitionAttributes(paf.create());
-        rf.setConcurrencyChecksEnabled(true);
-        rf.create("partitioned");
-
-        rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
-        rf.setConcurrencyChecksEnabled(true);
-        rf.create("replicate");
-        try {
-          startCacheServer();
-        } catch (Exception ex) {
-          Assert.fail("While starting CacheServer", ex);
-        }
-      }
-    };
-
-    vm0.invoke(createRegion);
-    vm1.invoke(createRegion);
-
-    // Create Create Region in the client
-    final int port = vm0.invoke(() -> EventTrackerDUnitTest.getCacheServerPort());
-    final String hostName = NetworkUtils.getServerHostName(host);
-    vm2.invoke(new CacheSerializableRunnable("Create client") {
-      public void run2() throws CacheException {
-        getCache();
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.LOCAL);
-        ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1,
-            null);
-        createRootRegion("partitioned", factory.create());
-        createRootRegion("replicate", factory.create());
-      }
-    });
-
-    doTwoPutAlls(vm2, "partitioned");
-    doTwoPutAlls(vm2, "replicate");
-
-    // Make sure that the event tracker for each bucket only records the last
-    // event.
-    checkBucketEventTracker(vm0, 0, 3);
-    checkBucketEventTracker(vm1, 0, 3);
-    checkBucketEventTracker(vm0, 1, 3);
-    checkBucketEventTracker(vm1, 1, 3);
-    checkBucketEventTracker(vm0, 2, 3);
-    checkBucketEventTracker(vm1, 2, 3);
-
-    checkReplicateEventTracker(vm0, 9);
-    checkReplicateEventTracker(vm1, 9);
-  }
-
-  private void doTwoPutAlls(VM vm, final String regionName) {
-    SerializableRunnable createData = new SerializableRunnable("putAlls") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(regionName);
-
-        Map putAllMap = new HashMap();
-        for (int i = 0; i < 9; i++) {
-          putAllMap.put(i, i);
-        }
-        region.putAll(putAllMap);
-
-        putAllMap.clear();
-        for (int i = 10; i < 19; i++) {
-          putAllMap.put(i, i);
-        }
-        region.putAll(putAllMap);
-      }
-    };
-
-    vm.invoke(createData);
-  }
-
-  private SerializableRunnable checkReplicateEventTracker(VM vm, final int expectedEntryCount) {
-    SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") {
-
-      public void run() {
-        Cache cache = getCache();
-        DistributedRegion region = (DistributedRegion) cache.getRegion("replicate");
-        checkEventTracker(region, expectedEntryCount);
-      }
-    };
-    vm.invoke(checkEventTracker);
-    return checkEventTracker;
-  }
-
-  private SerializableRunnable checkBucketEventTracker(VM vm, final int bucketNumber,
-      final int expectedEntryCount) {
-    SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") {
-
-      public void run() {
-        Cache cache = getCache();
-        PartitionedRegion region = (PartitionedRegion) cache.getRegion("partitioned");
-        BucketRegion br = region.getBucketRegion(bucketNumber);
-
-        checkEventTracker(br, expectedEntryCount);
-      }
-    };
-    vm.invoke(checkEventTracker);
-    return checkEventTracker;
-  }
-
-  private void checkEventTracker(LocalRegion region, int numberOfEvents) {
-    EventTracker tracker = region.getEventTracker();
-    ConcurrentMap<ThreadIdentifier, BulkOpHolder> memberToTags =
-        tracker.getRecordedBulkOpVersionTags();
-    assertEquals("memberToTags=" + memberToTags, 1, memberToTags.size());
-    BulkOpHolder holder = memberToTags.values().iterator().next();
-    // We expect the holder to retain only the last putAll that was performed.
-    assertEquals("entryToVersionTags=" + holder.entryVersionTags, numberOfEvents,
-        holder.entryVersionTags.size());
-  }
-
-  protected void startCacheServer() throws IOException {
-    CacheServer cacheServer = getCache().addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.start();
-    cacheServerPort = cacheServer.getPort();
-  }
-
-  protected static int getCacheServerPort() {
-    return cacheServerPort;
-  }
-
-  /**
-   * Tests event track is initialized after gii
-   */
-  @Test
-  public void testEventTrackerIsInitalized() throws CacheException {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-
-    createPRInVMs(vm0, vm1, vm2);
-
-    createPR();
-
-    doPutsInVMs(vm0, vm1, vm2);
-
-    doPuts();
-
-    verifyEventTrackerContent();
-
-    // close the region
-    getCache().getRegion(getName()).close();
-
-    // create the region again.
-    createPR();
-
-    for (int i = 0; i < 12; i++) {
-      waitEntryIsLocal(i);
-    }
-
-    // verify event track initialized after create region
-    verifyEventTrackerContent();
-
-  }
-
-  private void waitEntryIsLocal(int i) {
-    Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
-        .atMost(30, TimeUnit.SECONDS)
-        .until(() -> getCache().getRegion(getName()).getEntry(i) != null);
-  }
-
-  private void verifyEventTrackerContent() {
-    PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName());
-    BucketRegion br = pr.getDataStore().getLocalBucketById(0);
-    Map<?, ?> eventStates = br.getEventState();
-    assertTrue(eventStates.size() == 4);
-  }
-
-  public void createPRInVMs(VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(() -> createPR());
-    }
-  }
-
-  private void createPR() {
-    PartitionAttributesFactory paf =
-        new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4);
-    RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION)
-        .setPartitionAttributes(paf.create());
-    fact.create(getName());
-  }
-
-  public void doPutsInVMs(VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(() -> doPuts());
-    }
-  }
-
-  private void doPuts() {
-    Region region = getCache().getRegion(getName());
-    for (int i = 0; i < 12; i++) {
-      region.put(i, i);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java
deleted file mode 100644
index d74b3d5..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.cache;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.util.concurrent.ConcurrentMap;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.RegionAttributes;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.EventTracker.BulkOpHolder;
-import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
-import org.apache.geode.internal.cache.versions.VersionTag;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class EventTrackerTest {
-  LocalRegion lr;
-  RegionAttributes<?, ?> ra;
-  EntryEventImpl[] events;
-  EventTracker eventTracker;
-  ClientProxyMembershipID memberId;
-  DistributedMember member;
-
-  @Before
-  public void setUp() {
-    lr = mock(LocalRegion.class);
-    ra = mock(RegionAttributes.class);
-    when(lr.createStopper()).thenCallRealMethod();
-    CancelCriterion stopper = lr.createStopper();
-    when(lr.getStopper()).thenReturn(stopper);
-    memberId = mock(ClientProxyMembershipID.class);
-    when(lr.getAttributes()).thenReturn(ra);
-    when(ra.getDataPolicy()).thenReturn(mock(DataPolicy.class));
-    when(lr.getConcurrencyChecksEnabled()).thenReturn(true);
-
-    member = mock(DistributedMember.class);
-  }
-
-  @Test
-  public void retriedBulkOpDoesNotRemoveRecordedBulkOpVersionTags() {
-    byte[] memId = {1, 2, 3};
-    long threadId = 1;
-    long retrySeqId = 1;
-    ThreadIdentifier tid = new ThreadIdentifier(memId, threadId);
-    EventID retryEventID = new EventID(memId, threadId, retrySeqId);
-    boolean skipCallbacks = true;
-    int size = 5;
-    recordPutAllEvents(memId, threadId, skipCallbacks, size);
-
-    ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags();
-    BulkOpHolder holder = map.get(tid);
-    int beforeSize = holder.entryVersionTags.size();
-
-    eventTracker.recordBulkOpStart(tid, retryEventID);
-    map = eventTracker.getRecordedBulkOpVersionTags();
-    holder = map.get(tid);
-    // Retried bulk op should not remove exiting BulkOpVersionTags
-    assertTrue(holder.entryVersionTags.size() == beforeSize);
-  }
-
-  private void recordPutAllEvents(byte[] memId, long threadId, boolean skipCallbacks, int size) {
-    events = new EntryEventImpl[size];
-    eventTracker = new EventTracker(lr);
-    for (int i = 0; i < size; i++) {
-      events[i] = EntryEventImpl.create(lr, Operation.PUTALL_CREATE, "key" + i, "value" + i, null,
-          false, member, !skipCallbacks, new EventID(memId, threadId, i + 1));
-      events[i].setContext(memberId);
-      events[i].setVersionTag(mock(VersionTag.class));
-      eventTracker.recordEvent(events[i]);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java
index 6ca11b2..a1922cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java
@@ -58,7 +58,7 @@ public class IteratorDUnitTest extends JUnit4CacheTestCase {
     r.put("key3", "value3");
     LocalRegion lr = (LocalRegion) r;
     // simulate a removed key
-    // lr.getRegionMap().getEntry("key")._setValue(Token.REMOVED_PHASE1);
+    // region.getRegionMap().getEntry("key")._setValue(Token.REMOVED_PHASE1);
     lr.getRegionMap().getEntry("key").setValue(lr, Token.REMOVED_PHASE1);
     Iterator it = r.keySet().iterator();
     int numKeys = 0;

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java
index 54263b9..c3aff4c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java
@@ -192,9 +192,9 @@ public class PartitionedRegionDUnitTestCase extends JUnit4CacheTestCase {
       final long recoveryDelay) {
     return new CacheSerializableRunnable("getCreateMultiplePRregion") {
       public void run2() throws CacheException {
-        // final Random ra = new Random();
+        // final Random regionAttributes = new Random();
         for (int i = 0; i < maxIndex; i++) {
-          // final int rind = ra.nextInt(maxIndex);
+          // final int rind = regionAttributes.nextInt(maxIndex);
           try {
             getCache().createRegion(prPrefix + i, PartitionedRegionTestHelper
                 .createRegionAttrsForPR(redundancy, localmaxMemory, recoveryDelay));

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
index e69aa95..cd8408a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
@@ -581,11 +581,12 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest {
    * 
    * Region distRegion = null;
    * 
-   * AttributesFactory af = new AttributesFactory(); RegionAttributes ra; // setting property
-   * af.setScope(Scope.DISTRIBUTED_ACK); // creating region attributes ra = af.create(); try {
-   * distRegion = PartitionedRegionTestHelper.createCache().createRegion( diRegion, ra); } catch
-   * (RegionExistsException rex) { distRegion = PartitionedRegionTestHelper.createCache()
-   * .getRegion(diRegion); } // Closing the regions distRegion.close(); pr.close();
+   * AttributesFactory af = new AttributesFactory(); RegionAttributes regionAttributes; // setting
+   * property af.setScope(Scope.DISTRIBUTED_ACK); // creating region attributes regionAttributes =
+   * af.create(); try { distRegion = PartitionedRegionTestHelper.createCache().createRegion(
+   * diRegion, regionAttributes); } catch (RegionExistsException rex) { distRegion =
+   * PartitionedRegionTestHelper.createCache() .getRegion(diRegion); } // Closing the regions
+   * distRegion.close(); pr.close();
    * 
    * if (!pr.getCache().equals(distRegion.getCache())) {
    * fail("testValidateCloseFunction: getCache is not matching. "); } else { if

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java
new file mode 100644
index 0000000..fd69f98
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheEvent;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class DistributedEventTrackerTest {
+  LocalRegion region;
+  RegionAttributes<?, ?> regionAttributes;
+  DistributedEventTracker eventTracker;
+  ClientProxyMembershipID memberId;
+  DistributedMember member;
+
+  @Before
+  public void setup() {
+    region = mock(LocalRegion.class);
+    regionAttributes = mock(RegionAttributes.class);
+    when(region.createStopper()).thenCallRealMethod();
+    memberId = mock(ClientProxyMembershipID.class);
+    when(region.getAttributes()).thenReturn(regionAttributes);
+    when(regionAttributes.getDataPolicy()).thenReturn(mock(DataPolicy.class));
+    when(region.getConcurrencyChecksEnabled()).thenReturn(true);
+
+    member = mock(DistributedMember.class);
+    eventTracker = new DistributedEventTracker(region.getCache(), mock(CancelCriterion.class),
+        region.getName());
+  }
+
+  @Test
+  public void retriedBulkOpDoesNotRemoveRecordedBulkOpVersionTags() {
+    byte[] memId = {1, 2, 3};
+    long threadId = 1;
+    long retrySeqId = 1;
+    ThreadIdentifier tid = new ThreadIdentifier(memId, threadId);
+    EventID retryEventID = new EventID(memId, threadId, retrySeqId);
+    boolean skipCallbacks = true;
+    int size = 5;
+    recordPutAllEvents(memId, threadId, skipCallbacks, size);
+
+    ConcurrentMap<ThreadIdentifier, BulkOperationHolder> map =
+        eventTracker.getRecordedBulkOpVersionTags();
+    BulkOperationHolder holder = map.get(tid);
+    int beforeSize = holder.getEntryVersionTags().size();
+
+    eventTracker.recordBulkOpStart(retryEventID, tid);
+    map = eventTracker.getRecordedBulkOpVersionTags();
+    holder = map.get(tid);
+    // Retried bulk op should not remove exiting BulkOpVersionTags
+    assertTrue(holder.getEntryVersionTags().size() == beforeSize);
+  }
+
+  private void recordPutAllEvents(byte[] memId, long threadId, boolean skipCallbacks, int size) {
+    for (int i = 0; i < size; i++) {
+      putEvent("key" + i, "value" + i, memId, threadId, skipCallbacks, i + 1);
+      EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key" + i,
+          "value" + i, null, false, member, !skipCallbacks, new EventID(memId, threadId, i + 1));
+      event.setContext(memberId);
+      event.setVersionTag(mock(VersionTag.class));
+      eventTracker.recordEvent(event);
+    }
+  }
+
+  private void putEvent(String key, String value, byte[] memId, long threadId,
+      boolean skipCallbacks, int sequenceId) {
+    EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, key, value, null,
+        false, member, !skipCallbacks, new EventID(memId, threadId, sequenceId));
+    event.setContext(memberId);
+    event.setVersionTag(mock(VersionTag.class));
+    eventTracker.recordEvent(event);
+  }
+
+  private void putEvent(String key, String value, byte[] memId, long threadId,
+      boolean skipCallbacks, int sequenceId, VersionTag tag) {
+    EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, key, value, null,
+        false, member, !skipCallbacks, new EventID(memId, threadId, sequenceId));
+    event.setContext(memberId);
+    event.setVersionTag(tag);
+    eventTracker.recordEvent(event);
+  }
+
+  @Test
+  public void returnsCorrectNameOfCache() {
+    String testName = "testing";
+    when(region.getName()).thenReturn(testName);
+    eventTracker = new DistributedEventTracker(region.getCache(), mock(CancelCriterion.class),
+        region.getName());
+    assertEquals("Event Tracker for " + testName, eventTracker.getName());
+  }
+
+  @Test
+  public void initializationCorrectlyReadiesTheTracker() throws InterruptedException {
+    assertFalse(eventTracker.isInitialized());
+    eventTracker.setInitialized();
+    assertTrue(eventTracker.isInitialized());
+    eventTracker.waitOnInitialization();
+  }
+
+  @Test
+  public void startAndStopAddAndRemoveTrackerFromExpiryTask() {
+    EventTrackerExpiryTask task = mock(EventTrackerExpiryTask.class);
+    InternalCache cache = mock(InternalCache.class);
+    when(region.getCache()).thenReturn(cache);
+    when(cache.getEventTrackerTask()).thenReturn(task);
+    eventTracker = new DistributedEventTracker(region.getCache(), mock(CancelCriterion.class),
+        region.getName());
+    eventTracker.start();
+    verify(task, times(1)).addTracker(eventTracker);
+    eventTracker.stop();
+    verify(task, times(1)).removeTracker(eventTracker);
+  }
+
+  @Test
+  public void returnsEmptyMapIfRecordedEventsAreEmpty() {
+    assertEquals(0, eventTracker.getState().size());
+  }
+
+  @Test
+  public void returnsMapContainingSequenceIdHoldersCurrentlyPresent() {
+    EventSequenceNumberHolder sequenceIdHolder = new EventSequenceNumberHolder(0L, null);
+    ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
+    eventTracker.recordSequenceNumber(threadId, sequenceIdHolder);
+    Map<ThreadIdentifier, EventSequenceNumberHolder> state = eventTracker.getState();
+    assertEquals(1, state.size());
+    EventSequenceNumberHolder returnedHolder = state.get(threadId);
+    assertNotNull(returnedHolder);
+    // the version tag is stripped out on purpose, so passed in object and returned one are not
+    // equal to each other
+    assertNull(returnedHolder.getVersionTag());
+    assertEquals(sequenceIdHolder.getLastSequenceNumber(), returnedHolder.getLastSequenceNumber());
+  }
+
+  @Test
+  public void setToInitializedWhenStateRecorded() {
+    eventTracker.recordState(null, Collections.emptyMap());
+    assertTrue(eventTracker.isInitialized());
+  }
+
+  @Test
+  public void setsInitialImageProvidedWhenStateRecorded() {
+    InternalDistributedMember distributedMember = mock(InternalDistributedMember.class);
+    eventTracker.recordState(distributedMember, Collections.emptyMap());
+    assertTrue(eventTracker.isInitialImageProvider(distributedMember));
+  }
+
+  @Test
+  public void entryInRecordedStateStoredWhenNotInCurrentState() {
+    EventSequenceNumberHolder sequenceIdHolder = new EventSequenceNumberHolder(0L, null);
+    ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
+    Map<ThreadIdentifier, EventSequenceNumberHolder> state =
+        Collections.singletonMap(threadId, sequenceIdHolder);
+    eventTracker.recordState(null, state);
+    Map<ThreadIdentifier, EventSequenceNumberHolder> storedState = eventTracker.getState();
+    assertEquals(storedState.get(threadId).getLastSequenceNumber(),
+        sequenceIdHolder.getLastSequenceNumber());
+  }
+
+  @Test
+  public void entryInRecordedStateNotStoredIfAlreadyInCurrentState() {
+    EventSequenceNumberHolder originalSequenceIdHolder = new EventSequenceNumberHolder(0L, null);
+    ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
+    Map<ThreadIdentifier, EventSequenceNumberHolder> state =
+        Collections.singletonMap(threadId, originalSequenceIdHolder);
+    eventTracker.recordState(null, state);
+
+    EventSequenceNumberHolder newSequenceIdHolder = new EventSequenceNumberHolder(1L, null);
+    Map<ThreadIdentifier, EventSequenceNumberHolder> newState =
+        Collections.singletonMap(threadId, newSequenceIdHolder);
+    eventTracker.recordState(null, newState);
+
+    Map<ThreadIdentifier, EventSequenceNumberHolder> storedState = eventTracker.getState();
+    assertEquals(storedState.get(threadId).getLastSequenceNumber(),
+        originalSequenceIdHolder.getLastSequenceNumber());
+  }
+
+  @Test
+  public void hasSeenEventReturnsFalseForEventWithNoID() {
+    InternalCacheEvent event = mock(InternalCacheEvent.class);
+    when(event.getEventId()).thenReturn(null);
+    assertFalse(eventTracker.hasSeenEvent(event));
+  }
+
+  @Test
+  public void hasSeenEventReturnsFalseForNullEventID() {
+    assertFalse(eventTracker.hasSeenEvent((EventID) null));
+    assertFalse(eventTracker.hasSeenEvent(null, null));
+  }
+
+  @Test
+  public void hasNotSeenEventIDThatIsNotInRecordedEvents() {
+    EventID eventID = new EventID(new byte[0], 0L, 0L);
+    assertFalse(eventTracker.hasSeenEvent(eventID));
+  }
+
+  @Test
+  public void hasSeenEventIDThatIsInRecordedEvents() {
+    EventID eventID = new EventID(new byte[0], 0L, 0L);
+    recordSequence(eventID);
+    assertTrue(eventTracker.hasSeenEvent(eventID));
+  }
+
+  @Test
+  public void hasNotSeenEventIDWhosSequenceIDIsMarkedRemoved() {
+    EventID eventID = new EventID(new byte[0], 0L, 0L);
+    EventSequenceNumberHolder sequenceIdHolder =
+        new EventSequenceNumberHolder(eventID.getSequenceID(), null);
+    sequenceIdHolder.setRemoved(true);
+    ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L);
+    eventTracker.recordSequenceNumber(threadId, sequenceIdHolder);
+
+    assertFalse(eventTracker.hasSeenEvent(eventID));
+  }
+
+  @Test
+  public void hasNotSeeEventIDWhosSequenceIDIsLargerThanSeen() {
+    EventID eventID = new EventID(new byte[0], 0L, 0L);
+    recordSequence(eventID);
+
+    EventID higherSequenceID = new EventID(new byte[0], 0L, 1);
+    assertFalse(eventTracker.hasSeenEvent(higherSequenceID));
+  }
+
+  @Test
+  public void returnsNoTagIfNoSequenceForEvent() {
+    EventID eventID = new EventID(new byte[0], 0L, 1L);
+    assertNull(eventTracker.findVersionTagForSequence(eventID));
+  }
+
+  @Test
+  public void returnsNoTagIfSequencesDoNotMatchForEvent() {
+    EventID eventID = new EventID(new byte[0], 0L, 1);
+    recordSequence(eventID);
+    assertNull(eventTracker.findVersionTagForSequence(eventID));
+  }
+
+  @Test
+  public void returnsCorrectTagForEvent() {
+    EventID eventID = new EventID(new byte[0], 0L, 0L);
+    EventSequenceNumberHolder sequenceIdHolder = recordSequence(eventID);
+    assertEquals(sequenceIdHolder.getVersionTag(), eventTracker.findVersionTagForSequence(eventID));
+  }
+
+  @Test
+  public void returnsNoTagIfNoBulkOpWhenNoEventGiven() {
+    assertNull(eventTracker.findVersionTagForBulkOp(null));
+  }
+
+  @Test
+  public void returnsNoTagIfNoBulkOpForEventWithSequence() {
+    EventID eventID = new EventID(new byte[0], 0L, 1L);
+    assertNull(eventTracker.findVersionTagForBulkOp(eventID));
+  }
+
+  @Test
+  public void returnsNoTagIfBulkOpsDoNotMatchForEvent() {
+    putEvent("key", "value", new byte[0], 0, false, 0);
+    EventID eventIDWithoutBulkOp = new EventID(new byte[0], 0L, 1);
+    assertNull(eventTracker.findVersionTagForBulkOp(eventIDWithoutBulkOp));
+  }
+
+  @Test
+  public void returnsCorrectTagForEventWithBulkOp() {
+    EventID eventID = new EventID(new byte[0], 0L, 0L);
+    VersionTag tag = mock(VersionTag.class);
+    putEvent("key", "value", new byte[0], 0, false, 0, tag);
+    assertEquals(tag, eventTracker.findVersionTagForBulkOp(eventID));
+  }
+
+  @Test
+  public void executesABulkOperations() {
+    EventID eventID = new EventID(new byte[0], 0L, 1L);
+    Runnable bulkOperation = mock(Runnable.class);
+    eventTracker.syncBulkOp(bulkOperation, eventID, false);
+    verify(bulkOperation, times(1)).run();
+  }
+
+  @Test
+  public void executesRunnableIfNotPartOfATransaction() {
+    EventID eventID = new EventID(new byte[0], 0L, 1L);
+    Runnable bulkOperation = mock(Runnable.class);
+    eventTracker.syncBulkOp(bulkOperation, eventID, true);
+    verify(bulkOperation, times(1)).run();
+  }
+
+  private EventSequenceNumberHolder recordSequence(EventID eventID) {
+    EventSequenceNumberHolder sequenceIdHolder =
+        new EventSequenceNumberHolder(eventID.getSequenceID(), null);
+    ThreadIdentifier threadIdentifier = new ThreadIdentifier(new byte[0], eventID.getThreadID());
+    eventTracker.recordSequenceNumber(threadIdentifier, sequenceIdHolder);
+    return sequenceIdHolder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0e215d4b/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java
new file mode 100755
index 0000000..b85827c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.cache30.ClientServerTestCase;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+/**
+ * Tests <code>EventTracker</code> management.
+ *
+ * @since GemFire 6.5
+ */
+@Category(DistributedTest.class)
+public class EventTrackerDUnitTest extends JUnit4CacheTestCase {
+
+  /** The port on which the <code>CacheServer</code> was started in this VM */
+  private static int cacheServerPort;
+
+  /** The <code>Cache</code>'s <code>ExpiryTask</code>'s ping interval */
+  private static final String MESSAGE_TRACKING_TIMEOUT = "5000";
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  /**
+   * Tests <code>EventTracker</code> is created and destroyed when a <code>Region</code> is created
+   * and destroyed.
+   */
+  @Test
+  public void testEventTrackerCreateDestroy() throws CacheException {
+    // Verify the Cache's ExpiryTask contains no EventTrackers
+    GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+    EventTrackerExpiryTask expiryTask = cache.getEventTrackerTask();
+    assertNotNull(expiryTask);
+
+    // We start with 3 event trackers:
+    // one for the PDX registry region
+    // one for ManagementConstants.MONITORING_REGION
+    // one for ManagementConstants.NOTIFICATION_REGION
+    final int EXPECTED_TRACKERS = 3;
+    assertEquals(EXPECTED_TRACKERS, expiryTask.getNumberOfTrackers());
+
+    // Create a distributed Region
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    LocalRegion region = (LocalRegion) createRegion(getName(), factory.create());
+
+    // Verify an EventTracker is created and is empty
+    EventTracker eventTracker = region.getEventTracker();
+    assertNotNull(eventTracker);
+    Map eventState = region.getEventState();
+    assertNotNull(eventState);
+    assertEquals(0, eventState.size());
+
+    // Verify it and the root region's EventTracker are added to the Cache's ExpiryTask's trackers
+    assertEquals(EXPECTED_TRACKERS + 2, expiryTask.getNumberOfTrackers());
+
+    // Destroy the Region
+    region.destroyRegion();
+
+    // Verify the EventTracker is removed from the Cache's ExpiryTask's trackers
+    assertEquals(EXPECTED_TRACKERS + 1, expiryTask.getNumberOfTrackers());
+  }
+
+  /**
+   * Tests adding threads to an <code>EventTracker</code>.
+   */
+  @Test
+  public void testEventTrackerAddThreadIdentifier() throws CacheException {
+    Host host = Host.getHost(0);
+    VM serverVM = host.getVM(0);
+    VM clientVM = host.getVM(1);
+    final String regionName = getName();
+
+    // Create Region in the server and verify tracker is created
+    serverVM.invoke(new CacheSerializableRunnable("Create server") {
+      public void run2() throws CacheException {
+        // Create a distributed Region
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.DISTRIBUTED_ACK);
+        LocalRegion region = (LocalRegion) createRegion(regionName, factory.create());
+
+        // Verify an EventTracker is created
+        EventTracker eventTracker = region.getEventTracker();
+        assertNotNull(eventTracker);
+        try {
+          startCacheServer();
+        } catch (Exception ex) {
+          Assert.fail("While starting CacheServer", ex);
+        }
+      }
+    });
+
+    // Verify tracker in server contains no entries
+    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
+      public void run2() throws CacheException {
+        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
+        Map eventState = region.getEventState();
+        assertEquals(0, eventState.size());
+      }
+    });
+
+    // Create Create Region in the client
+    final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort());
+    final String hostName = NetworkUtils.getServerHostName(host);
+    clientVM.invoke(new CacheSerializableRunnable("Create client") {
+      public void run2() throws CacheException {
+        getCache();
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.LOCAL);
+        ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1,
+            null);
+        createRegion(regionName, factory.create());
+      }
+    });
+
+    // Do puts in the client
+    clientVM.invoke(new CacheSerializableRunnable("Do puts") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        for (int i = 0; i < 10; i++) {
+          region.put(i, i);
+        }
+      }
+    });
+
+    // Verify tracker in server contains an entry for client thread
+    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
+      public void run2() throws CacheException {
+        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
+        Map eventState = region.getEventState();
+        assertEquals(1, eventState.size());
+      }
+    });
+  }
+
+  /**
+   * Tests adding events to and removing events from an <code>EventTracker</code>.
+   */
+  @Test
+  public void testEventTrackerAddRemoveThreadIdentifier() throws CacheException {
+    Host host = Host.getHost(0);
+    VM serverVM = host.getVM(0);
+    VM clientVM = host.getVM(1);
+    final String regionName = getName();
+
+    // Create Region in the server and verify tracker is created
+    serverVM.invoke(new CacheSerializableRunnable("Create server") {
+      public void run2() throws CacheException {
+        // Set the message tracking timeout
+        System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout",
+            MESSAGE_TRACKING_TIMEOUT);
+
+        // Create a distributed Region
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.DISTRIBUTED_ACK);
+        LocalRegion region = (LocalRegion) createRegion(regionName, factory.create());
+
+        // Verify an EventTracker is created
+        EventTracker eventTracker = region.getEventTracker();
+        assertNotNull(eventTracker);
+        try {
+          startCacheServer();
+        } catch (Exception ex) {
+          Assert.fail("While starting CacheServer", ex);
+        }
+      }
+    });
+
+    // Verify tracker in server contains no entries
+    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
+      public void run2() throws CacheException {
+        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
+        Map eventState = region.getEventState();
+        assertEquals(0, eventState.size());
+      }
+    });
+
+    // Create Create Region in the client
+    final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort());
+    final String hostName = NetworkUtils.getServerHostName(host);
+    clientVM.invoke(new CacheSerializableRunnable("Create client") {
+      public void run2() throws CacheException {
+        getCache();
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.LOCAL);
+        ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1,
+            null);
+        createRegion(regionName, factory.create());
+      }
+    });
+
+    // Do puts in the client
+    clientVM.invoke(new CacheSerializableRunnable("Do puts") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        for (int i = 0; i < 10; i++) {
+          region.put(i, i);
+        }
+      }
+    });
+
+    // Verify tracker in server
+    serverVM.invoke(new CacheSerializableRunnable("Do puts") {
+      public void run2() throws CacheException {
+        // First verify it contains an entry
+        LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName);
+        Map eventState = region.getEventState();
+        assertEquals(1, eventState.size());
+
+        // Pause for the message tracking timeout
+        int waitTime = Integer.parseInt(MESSAGE_TRACKING_TIMEOUT) * 3;
+        Wait.pause(waitTime);
+
+        // Verify the server no longer contains an entry
+        eventState = region.getEventState();
+        assertEquals(0, eventState.size());
+      }
+    });
+  }
+
+  /**
+   * Test to make sure we don't leak put all events in the event tracker after multiple putAlls
+   */
+  @Test
+  public void testPutAllHoldersInEventTracker() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    SerializableRunnable createRegion = new SerializableRunnable("createRegion") {
+
+      public void run() {
+        Cache cache = getCache();
+        RegionFactory<Object, Object> rf =
+            cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(1);
+        paf.setTotalNumBuckets(3);
+        rf.setPartitionAttributes(paf.create());
+        rf.setConcurrencyChecksEnabled(true);
+        rf.create("partitioned");
+
+        rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+        rf.setConcurrencyChecksEnabled(true);
+        rf.create("replicate");
+        try {
+          startCacheServer();
+        } catch (Exception ex) {
+          Assert.fail("While starting CacheServer", ex);
+        }
+      }
+    };
+
+    vm0.invoke(createRegion);
+    vm1.invoke(createRegion);
+
+    // Create Create Region in the client
+    final int port = vm0.invoke(() -> EventTrackerDUnitTest.getCacheServerPort());
+    final String hostName = NetworkUtils.getServerHostName(host);
+    vm2.invoke(new CacheSerializableRunnable("Create client") {
+      public void run2() throws CacheException {
+        getCache();
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.LOCAL);
+        ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1,
+            null);
+        createRootRegion("partitioned", factory.create());
+        createRootRegion("replicate", factory.create());
+      }
+    });
+
+    doTwoPutAlls(vm2, "partitioned");
+    doTwoPutAlls(vm2, "replicate");
+
+    // Make sure that the event tracker for each bucket only records the last
+    // event.
+    checkBucketEventTracker(vm0, 0, 3);
+    checkBucketEventTracker(vm1, 0, 3);
+    checkBucketEventTracker(vm0, 1, 3);
+    checkBucketEventTracker(vm1, 1, 3);
+    checkBucketEventTracker(vm0, 2, 3);
+    checkBucketEventTracker(vm1, 2, 3);
+
+    checkReplicateEventTracker(vm0, 9);
+    checkReplicateEventTracker(vm1, 9);
+  }
+
+  private void doTwoPutAlls(VM vm, final String regionName) {
+    SerializableRunnable createData = new SerializableRunnable("putAlls") {
+
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion(regionName);
+
+        Map putAllMap = new HashMap();
+        for (int i = 0; i < 9; i++) {
+          putAllMap.put(i, i);
+        }
+        region.putAll(putAllMap);
+
+        putAllMap.clear();
+        for (int i = 10; i < 19; i++) {
+          putAllMap.put(i, i);
+        }
+        region.putAll(putAllMap);
+      }
+    };
+
+    vm.invoke(createData);
+  }
+
+  private SerializableRunnable checkReplicateEventTracker(VM vm, final int expectedEntryCount) {
+    SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") {
+
+      public void run() {
+        Cache cache = getCache();
+        DistributedRegion region = (DistributedRegion) cache.getRegion("replicate");
+        checkEventTracker(region, expectedEntryCount);
+      }
+    };
+    vm.invoke(checkEventTracker);
+    return checkEventTracker;
+  }
+
+  private SerializableRunnable checkBucketEventTracker(VM vm, final int bucketNumber,
+      final int expectedEntryCount) {
+    SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") {
+
+      public void run() {
+        Cache cache = getCache();
+        PartitionedRegion region = (PartitionedRegion) cache.getRegion("partitioned");
+        BucketRegion br = region.getBucketRegion(bucketNumber);
+
+        checkEventTracker(br, expectedEntryCount);
+      }
+    };
+    vm.invoke(checkEventTracker);
+    return checkEventTracker;
+  }
+
+  private void checkEventTracker(LocalRegion region, int numberOfEvents) {
+    EventTracker tracker = region.getEventTracker();
+    ConcurrentMap<ThreadIdentifier, BulkOperationHolder> memberToTags =
+        tracker.getRecordedBulkOpVersionTags();
+    assertEquals("memberToTags=" + memberToTags, 1, memberToTags.size());
+    BulkOperationHolder holder = memberToTags.values().iterator().next();
+    // We expect the holder to retain only the last putAll that was performed.
+    assertEquals("entryToVersionTags=" + holder.getEntryVersionTags(), numberOfEvents,
+        holder.getEntryVersionTags().size());
+  }
+
+  protected void startCacheServer() throws IOException {
+    CacheServer cacheServer = getCache().addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+    cacheServerPort = cacheServer.getPort();
+  }
+
+  protected static int getCacheServerPort() {
+    return cacheServerPort;
+  }
+
+  /**
+   * Tests event track is initialized after gii
+   */
+  @Test
+  public void testEventTrackerIsInitalized() throws CacheException {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    createPRInVMs(vm0, vm1, vm2);
+
+    createPR();
+
+    doPutsInVMs(vm0, vm1, vm2);
+
+    doPuts();
+
+    verifyEventTrackerContent();
+
+    // close the region
+    getCache().getRegion(getName()).close();
+
+    // create the region again.
+    createPR();
+
+    for (int i = 0; i < 12; i++) {
+      waitEntryIsLocal(i);
+    }
+
+    // verify event track initialized after create region
+    verifyEventTrackerContent();
+
+  }
+
+  private void waitEntryIsLocal(int i) {
+    Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+        .atMost(30, TimeUnit.SECONDS)
+        .until(() -> getCache().getRegion(getName()).getEntry(i) != null);
+  }
+
+  private void verifyEventTrackerContent() {
+    PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName());
+    BucketRegion br = pr.getDataStore().getLocalBucketById(0);
+    Map<?, ?> eventStates = br.getEventState();
+    assertTrue(eventStates.size() == 4);
+  }
+
+  public void createPRInVMs(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> createPR());
+    }
+  }
+
+  private void createPR() {
+    PartitionAttributesFactory paf =
+        new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4);
+    RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(paf.create());
+    fact.create(getName());
+  }
+
+  public void doPutsInVMs(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> doPuts());
+    }
+  }
+
+  private void doPuts() {
+    Region region = getCache().getRegion(getName());
+    for (int i = 0; i < 12; i++) {
+      region.put(i, i);
+    }
+  }
+}


Mime
View raw message