geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [14/49] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache
Date Mon, 08 May 2017 23:05:50 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/8c2210db/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..f09bb47 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketNotFoundException;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -67,7 +66,7 @@ import org.apache.geode.internal.cache.DiskRegionStats;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
@@ -81,7 +80,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -119,8 +117,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   protected volatile boolean resetLastPeeked = false;
 
-
-
   /**
    * There will be one shadow pr for each of the the PartitionedRegion which has added the
    * GatewaySender Fix for Bug#45917 We maintain a tempQueue to queue events when buckets
are not
@@ -134,8 +130,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * the secondary nodes to remove the events which have already been dispatched from the
queue.
    */
   public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 10;
+
   // TODO:REF: how to change the message sync interval ? should it be common for serial and
parallel
   protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL;
+
   // TODO:REF: name change for thread, as it appears in the log
   private BatchRemovalThread removalThread = null;
 
@@ -223,16 +221,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
 
   final protected int index;
+
   final protected int nDispatcher;
 
   private MetaRegionFactory metaRegionFactory;
 
-  /**
-   * A transient queue to maintain the eventSeqNum of the events that are to be sent to remote
site.
-   * It is cleared when the queue is cleared.
-   */
-  // private final BlockingQueue<Long> eventSeqNumQueue;
-
   public ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set<Region> userRegions,
int idx,
       int nDispatcher) {
     this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory());
@@ -249,7 +242,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     this.sender = sender;
 
     List<Region> listOfRegions = new ArrayList<Region>(userRegions);
-    // eventSeqNumQueue = new LinkedBlockingQueue<Long>();
     Collections.sort(listOfRegions, new Comparator<Region>() {
       @Override
       public int compare(Region o1, Region o2) {
@@ -273,7 +265,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         throw new GatewaySenderConfigurationException(
             LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1
                 .toLocalizedString(new Object[] {this.sender.getId(), userRegion.getFullPath()}));
-        // addShadowPartitionedRegionForUserRR((DistributedRegion)userRegion);
       }
     }
 
@@ -295,7 +286,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     // still, it is safer approach to synchronize it
     synchronized (ParallelGatewaySenderQueue.class) {
       if (removalThread == null) {
-        removalThread = new BatchRemovalThread((GemFireCacheImpl) this.sender.getCache(),
this);
+        removalThread = new BatchRemovalThread(this.sender.getCache(), this);
         removalThread.start();
       }
     }
@@ -317,7 +308,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (this.userRegionNameToshadowPRMap.containsKey(regionName))
         return;
 
-      GemFireCacheImpl cache = (GemFireCacheImpl) sender.getCache();
+      InternalCache cache = sender.getCache();
       final String prQName = getQueueName(sender.getId(), userRegion.getFullPath());
       prQ = (PartitionedRegion) cache.getRegion(prQName);
       if (prQ == null) {
@@ -375,8 +366,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 prQ.getPartitionAttributes());
           }
 
-          // Suranjan: TODO This should not be set on the PR but on the
-          // GatewaySender
+          // TODO This should not be set on the PR but on the GatewaySender
           prQ.enableConflation(sender.isBatchConflationEnabled());
 
           // Before going ahead, make sure all the buckets of shadowPR are
@@ -391,32 +381,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           }
           // In case of Replicated Region it may not be necessary.
 
-          // if (sender.isPersistenceEnabled()) {
-          // //Kishor: I need to write a test for this code.
-          // Set<Integer> allBucketsClone = new HashSet<Integer>();
-          // // allBucketsClone.addAll(allBuckets);*/
-          // for (int i = 0; i < sender.getMaxParallelismForReplicatedRegion(); i++)
-          // allBucketsClone.add(i);
-          //
-          // while (!(allBucketsClone.size() == 0)) {
-          // Iterator<Integer> itr = allBucketsClone.iterator();
-          // while (itr.hasNext()) {
-          // InternalDistributedMember node = prQ.getNodeForBucketWrite(
-          // itr.next(), null);
-          // if (node != null) {
-          // itr.remove();
-          // }
-          // }
-          // // after the iteration is over, sleep for sometime before trying
-          // // again
-          // try {
-          // Thread.sleep(WAIT_CYCLE_SHADOW_BUCKET_LOAD);
-          // }
-          // catch (InterruptedException e) {
-          // logger.error(e);
-          // }
-          // }
-          // }
         } catch (IOException veryUnLikely) {
           logger.fatal(LocalizedMessage.create(
               LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0,
@@ -453,7 +417,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
 
   private static String convertPathToName(String fullPath) {
-    // return fullPath.replaceAll("/", "_");
     return "";
   }
 
@@ -490,7 +453,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 .toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()}));
       }
 
-      GemFireCacheImpl cache = (GemFireCacheImpl) sender.getCache();
+      InternalCache cache = sender.getCache();
       boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
 
       final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath());
@@ -549,7 +512,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                   .setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null));
           // at this point we should be able to assert prQ == meta;
 
-          // Suranjan: TODO This should not be set on the PR but on the GatewaySender
+          // TODO This should not be set on the PR but on the GatewaySender
           prQ.enableConflation(sender.isBatchConflationEnabled());
           if (isAccessor)
             return; // return from here if accessor node
@@ -576,7 +539,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
     } finally {
       if (prQ != null) {
-
         this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
       }
       /*
@@ -611,7 +573,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
 
   protected void afterRegionAdd(PartitionedRegion userPR) {
-
+    // nothing
   }
 
   /**
@@ -666,18 +628,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   public boolean put(Object object) throws InterruptedException, CacheException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     boolean putDone = false;
-    // Suranjan : Can this region ever be null? Should we work with regionName and not with
region
+    // Can this region ever be null? Should we work with regionName and not with region
     // instance.
     // It can't be as put is happeing on the region and its still under process
     GatewaySenderEventImpl value = (GatewaySenderEventImpl) object;
     boolean isDREvent = isDREvent(value);
 
-    // if (isDREvent(value)) {
-    // putInShadowPRForReplicatedRegion(object);
-    // value.freeOffHeapValue();
-    // return;
-    // }
-
     Region region = value.getRegion();
     String regionPath = null;
     if (isDREvent) {
@@ -795,11 +751,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                     brq.getInitializationLock().readLock().unlock();
                   }
                 } else {
-                  // tempQueue = this.bucketToTempQueueMap.get(bucketId);
-                  // if (tempQueue == null) {
-                  // tempQueue = new LinkedBlockingQueue();
-                  // this.bucketToTempQueueMap.put(bucketId, tempQueue);
-                  // }
                   tempQueue.add(value);
                   putDone = true;
                   // For debugging purpose.
@@ -811,7 +762,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 }
               }
             }
-            // }
           }
 
         } finally {
@@ -873,12 +823,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     try {
       if (brq != null) {
         addedValueToQueue = brq.addToQueue(key, value);
-        // TODO : Kishor : During merge, ParallelWANstats test failed. On
+        // TODO: During merge, ParallelWANstats test failed. On
         // comment below code test passed. cheetha does not have below code.
         // need to find out from hcih revision this code came
-        // if (brq.getBucketAdvisor().isPrimary()) {
-        // this.stats.incQueueSize();
-        // }
       }
     } catch (BucketNotFoundException e) {
       if (logger.isDebugEnabled()) {
@@ -933,18 +880,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return new HashSet(this.userRegionNameToshadowPRMap.values());
   }
 
-  // TODO: Suranjan Find optimal way to get Random shadow pr as this will be called in each
put and
-  // peek.
+  // TODO: Find optimal way to get Random shadow pr as this will be called in each put and
peek.
   protected PartitionedRegion getRandomShadowPR() {
     PartitionedRegion prQ = null;
     if (this.userRegionNameToshadowPRMap.values().size() > 0) {
       int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size());
       prQ = (PartitionedRegion) this.userRegionNameToshadowPRMap.values().toArray()[randomIndex];
     }
-    // if (this.userPRToshadowPRMap.values().size() > 0
-    // && (prQ == null)) {
-    // prQ = getRandomShadowPR();
-    // }
     return prQ;
   }
 
@@ -1029,13 +971,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
       // TODO:REF: instead of shuffle use random number, in this method we are
       // returning id instead we should return BRQ itself
-      /*
-       * Collections.shuffle(thisProcessorBuckets); for (Integer bucketId : thisProcessorBuckets)
{
-       * BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore()
-       * .getBucketRegionQueueByBucketId(bucketId);
-       *
-       * if (br != null && br.isReadyForPeek()) { return br.getId(); } }
-       */
     }
     return -1;
   }
@@ -1052,9 +987,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
       GatewaySenderEventImpl event = this.peekedEvents.remove();
       try {
-        // PartitionedRegion prQ = this.userPRToshadowPRMap.get(ColocationHelper
-        // .getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath());
-        //
         PartitionedRegion prQ = null;
         int bucketId = -1;
         Object key = null;
@@ -1071,11 +1003,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           }
         } else {
           String regionPath = event.getRegionPath();
-          GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache();
+          InternalCache cache = this.sender.getCache();
           Region region = (PartitionedRegion) cache.getRegion(regionPath);
           if (region != null && !region.isDestroyed()) {
-            // TODO: Suranjan We have to get colocated parent region for this
-            // region
+            // TODO: We have to get colocated parent region for this region
             if (region instanceof DistributedRegion) {
               prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath());
               event.getBucketId();
@@ -1105,7 +1036,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) {
     boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
     BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
-    // TODO : Kishor : Make sure we dont need to initalize a bucket
+    // TODO : Make sure we dont need to initalize a bucket
     // before destroying a key from it
     try {
       if (brq != null) {
@@ -1261,7 +1192,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      blockProcesorThreadIfRequired();
+      blockProcessorThreadIfRequired();
       return batch;
     }
 
@@ -1340,7 +1271,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           this, batch.size(), size(), localSize());
     }
     if (batch.size() == 0) {
-      blockProcesorThreadIfRequired();
+      blockProcessorThreadIfRequired();
     }
     return batch;
   }
@@ -1400,10 +1331,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
   }
 
-  protected void blockProcesorThreadIfRequired() throws InterruptedException {
+  protected void blockProcessorThreadIfRequired() throws InterruptedException {
     queueEmptyLock.lock();
     try {
-      // while (isQueueEmpty) {
       if (isQueueEmpty) { // merge44610: this if condition came from cheetah 44610
         if (logger.isDebugEnabled()) {
           logger.debug("Going to wait, till notified.");
@@ -1414,7 +1344,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         // parameter but cedar does not have such corresponding method
         queueEmptyCondition.await(1000);
         // merge44610: this time waiting came from cheetah 44610
-        // isQueueEmpty = this.localSize() == 0;
       }
       // update the flag so that next time when we come we will block.
       isQueueEmpty = this.localSizeForProcessor() == 0;
@@ -1526,7 +1455,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     throw new UnsupportedOperationException();
   }
 
-
   @Override
   public void remove(int batchSize) throws CacheException {
     for (int i = 0; i < batchSize; i++) {
@@ -1596,14 +1524,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   @Override
   public void close() {
     // Because of bug 49060 do not close the regions of a parallel queue
-    // for (Region r: getRegions()) {
-    // if (r != null && !r.isDestroyed()) {
-    // try {
-    // r.close();
-    // } catch (RegionDestroyedException e) {
-    // }
-    // }
-    // }
   }
 
   /**
@@ -1634,14 +1554,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final GemFireCacheImpl cache;
+    private final InternalCache cache;
 
     private final ParallelGatewaySenderQueue parallelQueue;
 
     /**
      * Constructor : Creates and initializes the thread
      */
-    public BatchRemovalThread(GemFireCacheImpl c, ParallelGatewaySenderQueue queue) {
+    public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
       super("BatchRemovalThread");
       // TODO:REF: Name for this thread ?
       this.setDaemon(true);
@@ -1772,7 +1692,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       }
     }
 
-    private Set<InternalDistributedMember> getAllRecipients(GemFireCacheImpl cache,
Map map) {
+    private Set<InternalDistributedMember> getAllRecipients(InternalCache cache, Map
map) {
       Set recipients = new ObjectOpenHashSet();
       for (Object pr : map.keySet()) {
         PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion((String)
pr);
@@ -1811,7 +1731,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     AbstractGatewaySender sender = null;
 
     public ParallelGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs,
-        LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender pgSender)
{
+        LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender pgSender) {
       super(regionName, attrs, parentRegion, cache,
           new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
               .setSnapshotInputStream(null).setImageTarget(null)
@@ -1872,8 +1792,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
 
   static class MetaRegionFactory {
-    ParallelGatewaySenderQueueMetaRegion newMetataRegion(GemFireCacheImpl cache,
-        final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) {
+    ParallelGatewaySenderQueueMetaRegion newMetataRegion(InternalCache cache, final String
prQName,
+        final RegionAttributes ra, AbstractGatewaySender sender) {
       ParallelGatewaySenderQueueMetaRegion meta =
           new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender);
       return meta;

http://git-wip-us.apache.org/repos/asf/geode/blob/8c2210db/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 921af9c..a7eb9e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -38,6 +38,7 @@ import org.apache.geode.distributed.internal.PooledDistributionMessage;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
@@ -52,7 +53,6 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
  * 
  * @since GemFire 8.0
  */
-
 public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
 
   private static final Logger logger = LogService.getLogger();
@@ -73,7 +73,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage
{
   @Override
   protected void process(DistributionManager dm) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    final GemFireCacheImpl cache;
+    final InternalCache cache;
     cache = GemFireCacheImpl.getInstance();
     if (cache != null) {
       int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);

http://git-wip-us.apache.org/repos/asf/geode/blob/8c2210db/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 02baf81..7928662 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -54,7 +54,7 @@ import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionQueue;
@@ -72,7 +72,6 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration;
 
 /**
  * @since GemFire 7.0
- * 
  */
 public class SerialGatewaySenderQueue implements RegionQueue {
 
@@ -208,14 +207,12 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     initializeRegion(abstractSender, listener);
     // Increment queue size. Fix for bug 51988.
     this.stats.incQueueSize(this.region.size());
-    this.removalThread = new BatchRemovalThread((GemFireCacheImpl) abstractSender.getCache());
+    this.removalThread = new BatchRemovalThread(abstractSender.getCache());
     this.removalThread.start();
     this.sender = abstractSender;
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Contains {} elements", this, size());
     }
-
-
   }
 
   public Region<Long, AsyncEvent> getRegion() {
@@ -233,18 +230,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
         (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
     final boolean isWbcl = this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
     if (!(isPDXRegion && isWbcl)) {
-      // TODO: Kishor : after merging this change. AsyncEventQueue test failed
-      // with data inconsistency. As of now going ahead with sync putandGetKey.
-      // Need to work on this during cedar
-      // if (this.keyPutNoSync) {
-      // putAndGetKeyNoSync(event);
-      // }
-      // else {
-      // synchronized (this) {
       putAndGetKey(event);
       return true;
-      // }
-      // }
     }
     return false;
   }
@@ -366,26 +353,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     // If we do want to support it then each caller needs
     // to call freeOffHeapResources and the returned GatewaySenderEventImpl
     throw new UnsupportedOperationException();
-    // resetLastPeeked();
-    // AsyncEvent object = peekAhead();
-    // // If it is not null, destroy it and increment the head key
-    // if (object != null) {
-    // Long key = this.peekedIds.remove();
-    // if (logger.isTraceEnabled()) {
-    // logger.trace("{}: Retrieved {} -> {}",this, key, object);
-    // }
-    // // Remove the entry at that key with a callback arg signifying it is
-    // // a WAN queue so that AbstractRegionEntry.destroy can get the value
-    // // even if it has been evicted to disk. In the normal case, the
-    // // AbstractRegionEntry.destroy only gets the value in the VM.
-    // this.region.destroy(key, RegionQueue.WAN_QUEUE_TOKEN);
-    // updateHeadKey(key.longValue());
-
-    // if (logger.isTraceEnabled()) {
-    // logger.trace("{}: Destroyed {} -> {}", this, key, object);
-    // }
-    // }
-    // return object;
   }
 
   public List<AsyncEvent> take(int batchSize) throws CacheException {
@@ -393,20 +360,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     // If we do want to support it then the callers
     // need to call freeOffHeapResources on each returned GatewaySenderEventImpl
     throw new UnsupportedOperationException();
-    // List<AsyncEvent> batch = new ArrayList<AsyncEvent>(
-    // batchSize * 2);
-    // for (int i = 0; i < batchSize; i++) {
-    // AsyncEvent obj = take();
-    // if (obj != null) {
-    // batch.add(obj);
-    // } else {
-    // break;
-    // }
-    // }
-    // if (logger.isTraceEnabled()) {
-    // logger.trace("{}: Took a batch of {} entries", this, batch.size());
-    // }
-    // return batch;
   }
 
   /**
@@ -442,7 +395,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
     this.lastDispatchedKey = key;
     if (wasEmpty) {
-      this.notify();
+      notifyAll();
     }
 
     if (logger.isDebugEnabled()) {
@@ -468,7 +421,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
   }
 
   public Object peek() throws CacheException {
-    // resetLastPeeked();
     Object object = peekAhead();
     if (logger.isTraceEnabled()) {
       logger.trace("{}: Peeked {} -> {}", this, peekedIds, object);
@@ -494,7 +446,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     }
     List<AsyncEvent> batch = new ArrayList<AsyncEvent>(size * 2); // why
                                                                   // *2?
-    // resetLastPeeked();
     while (batch.size() < size) {
       AsyncEvent object = peekAhead();
       // Conflate here
@@ -725,7 +676,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 
   /**
    * Clear the list of peeked keys. The next peek will start again at the head key.
-   * 
    */
   public void resetLastPeeked() {
     this.peekedIds.clear();
@@ -736,7 +686,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
    * 
    * @throws CacheException
    */
-
   private Long getCurrentKey() {
     long currentKey;
     if (this.peekedIds.isEmpty()) {
@@ -775,7 +724,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
       return null;
     }
 
-
     // It's important here that we check where the current key
     // is in relation to the tail key before we check to see if the
     // object exists. The reason is that the tail key is basically
@@ -785,7 +733,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     // If we check for the object, and then check the tail key, we could
     // skip objects.
 
-    // @todo don't do a get which updates the lru, instead just get the value
+    // TODO: don't do a get which updates the lru, instead just get the value
     // w/o modifying the LRU.
     // Note: getting the serialized form here (if it has overflowed to disk)
     // does not save anything since GatewayBatchOp needs to GatewayEventImpl
@@ -969,7 +917,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
   private void initializeRegion(AbstractGatewaySender sender, CacheListener listener) {
-    final GemFireCacheImpl gemCache = (GemFireCacheImpl) sender.getCache();
+    final InternalCache gemCache = sender.getCache();
     this.region = gemCache.getRegion(this.regionName);
     if (this.region == null) {
       AttributesFactory<Long, AsyncEvent> factory = new AttributesFactory<Long,
AsyncEvent>();
@@ -992,11 +940,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
       factory.setEvictionAttributes(ea);
       factory.setConcurrencyChecksEnabled(false);
 
-
       factory.setDiskStoreName(this.diskStoreName);
-      // TODO: Suranjan, can we do the following
-      // In case of persistence write to disk sync and in case of eviction
-      // write in async
+
+      // In case of persistence write to disk sync and in case of eviction write in async
       factory.setDiskSynchronous(this.isDiskSynchronous);
 
       // Create the region
@@ -1067,12 +1013,14 @@ public class SerialGatewaySenderQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final GemFireCacheImpl cache;
+    private final InternalCache cache;
 
     /**
      * Constructor : Creates and initializes the thread
+     * 
+     * @param c
      */
-    public BatchRemovalThread(GemFireCacheImpl c) {
+    public BatchRemovalThread(InternalCache c) {
       this.setDaemon(true);
       this.cache = c;
     }
@@ -1213,7 +1161,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     AbstractGatewaySender sender = null;
 
     protected SerialGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs,
-        LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender sender) {
+        LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender sender) {
       super(regionName, attrs, parentRegion, cache,
           new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
               .setSnapshotInputStream(null).setImageTarget(null)


Mime
View raw message