geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [18/36] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache
Date Mon, 01 May 2017 19:49:31 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index bbff29c..b3c23b1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -14,19 +14,62 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetAddress;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.DiskStoreFactory;
@@ -72,55 +115,11 @@ import org.apache.geode.pdx.internal.EnumInfo;
 import org.apache.geode.pdx.internal.PdxField;
 import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.pdx.internal.PeerTypeRegistration;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetAddress;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Represents a (disk-based) persistent store for region data. Used for both persistent recoverable
  * regions and overflow-only regions.
- * 
- * 
+ *
  * @since GemFire 3.2
  */
 @SuppressWarnings("synthetic-access")
@@ -128,6 +127,7 @@ public class DiskStoreImpl implements DiskStore {
   private static final Logger logger = LogService.getLogger();
 
   private static final String BACKUP_DIR_PREFIX = "dir";
+
   public static final boolean KRF_DEBUG = Boolean.getBoolean("disk.KRF_DEBUG");
 
   public static final int MAX_OPEN_INACTIVE_OPLOGS =
@@ -166,6 +166,7 @@ public class DiskStoreImpl implements DiskStore {
 
   public static final String RECOVER_VALUE_PROPERTY_NAME =
       DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValues";
+
   public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME =
       DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValuesSync";
 
@@ -177,9 +178,12 @@ public class DiskStoreImpl implements DiskStore {
       DistributionConfig.GEMFIRE_PREFIX + "disk.recoverLruValues";
 
   boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true);
+
   boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false);
+
   boolean FORCE_KRF_RECOVERY =
       getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disk.FORCE_KRF_RECOVERY", false);
+
   final boolean RECOVER_LRU_VALUES =
       getBoolean(DiskStoreImpl.RECOVER_LRU_VALUES_PROPERTY_NAME, false);
 
@@ -188,7 +192,9 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   public static final long MIN_RESERVED_DRID = 1;
+
   public static final long MAX_RESERVED_DRID = 8;
+
   static final long MIN_DRID = MAX_RESERVED_DRID + 1;
 
   /**
@@ -205,9 +211,7 @@ public class DiskStoreImpl implements DiskStore {
   private final int MAX_OPLOGS_PER_COMPACTION = Integer.getInteger(
       DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION",
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_ROLL", 1).intValue());
-  /**
-   *
-   */
+
   public static final int MAX_CONCURRENT_COMPACTIONS = Integer.getInteger(
       DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_COMPACTIONS",
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_ROLLS", 1).intValue());
@@ -219,6 +223,7 @@ public class DiskStoreImpl implements DiskStore {
    */
   public static final int MAX_PENDING_TASKS =
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "disk.MAX_PENDING_TASKS", 6);
+
   /**
    * This system property indicates that IF should also be preallocated. This property will be used
    * in conjunction with the PREALLOCATE_OPLOGS property. If PREALLOCATE_OPLOGS is ON the below will
@@ -227,6 +232,7 @@ public class DiskStoreImpl implements DiskStore {
   static final boolean PREALLOCATE_IF =
       !System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "preAllocateIF", "true")
           .equalsIgnoreCase("false");
+
   /**
    * This system property indicates that Oplogs should be preallocated till the maxOplogSize as
    * specified for the disk store.
@@ -252,19 +258,14 @@ public class DiskStoreImpl implements DiskStore {
   public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS;
   public static volatile HashSet<String> TEST_NO_FALLOC_DIRS;
 
-  // /** delay for slowing down recovery, for testing purposes only */
-  // public static volatile int recoverDelay = 0;
-
-  // //////////////////// Instance Fields ///////////////////////
-
-  private final GemFireCacheImpl cache;
+  private final InternalCache cache;
 
   /** The stats for this store */
   private final DiskStoreStats stats;
 
   /**
-   * Asif:Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of
-   * the threads acquiring read lock, etc is not a good idea to solve the issue
+   * Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of the
+   * threads acquiring read lock, etc is not a good idea to solve the issue
    */
   private final AtomicInteger entryOpsCount = new AtomicInteger();
   /**
@@ -291,10 +292,11 @@ public class DiskStoreImpl implements DiskStore {
    * is forced. If this value is 0 then no limit.
    */
   private final int maxAsyncItems;
+
   private final AtomicInteger forceFlushCount;
+
   private final Object asyncMonitor;
 
-  // complex vars
   /** Compactor task which does the compaction. Null if compaction not possible. */
   private final OplogCompactor oplogCompactor;
 
@@ -303,7 +305,9 @@ public class DiskStoreImpl implements DiskStore {
   private volatile DiskStoreBackup diskStoreBackup = null;
 
   private final ReentrantReadWriteLock compactorLock = new ReentrantReadWriteLock();
+
   private final WriteLock compactorWriteLock = compactorLock.writeLock();
+
   private final ReadLock compactorReadLock = compactorLock.readLock();
 
   /**
@@ -316,37 +320,21 @@ public class DiskStoreImpl implements DiskStore {
       new AtomicReference<DiskAccessException>();
 
   PersistentOplogSet persistentOplogs = new PersistentOplogSet(this);
-  OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
-
-  // private boolean isThreadWaitingForSpace = false;
-
-  /**
-   * Get the next available dir
-   */
-
-  // /**
-  // * Max timed wait for disk space to become available for an entry operation
-  // ,
-  // * in milliseconds. This will be the maximum time for which a
-  // * create/modify/remove operation will wait so as to allow switch over & get
-  // a
-  // * new Oplog for writing. If no space is available in that time,
-  // * DiskAccessException will be thrown. The default wait will be for 120
-  // * seconds
-  // */
-  // private static final long MAX_WAIT_FOR_SPACE = Integer.getInteger(
-  // "MAX_WAIT_FOR_SPACE", 20).intValue() * 1000;
 
+  OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
 
   private final AtomicLong regionIdCtr = new AtomicLong(MIN_DRID);
+
   /**
    * Only contains backup DiskRegions. The Value could be a RecoveredDiskRegion or a DiskRegion
    */
   private final ConcurrentMap<Long, DiskRegion> drMap = new ConcurrentHashMap<Long, DiskRegion>();
+
   /**
    * A set of overflow only regions that are using this disk store.
    */
   private final Set<DiskRegion> overflowMap = new ConcurrentHashSet<DiskRegion>();
+
   /**
    * Contains all of the disk recovery stores for which we are recovering values asnynchronously.
    */
@@ -369,9 +357,8 @@ public class DiskStoreImpl implements DiskStore {
   private final ThreadPoolExecutor diskStoreTaskPool;
 
   private final ThreadPoolExecutor delayedWritePool;
-  private volatile Future lastDelayedWrite;
 
-  // ///////////////////// Constructors /////////////////////////
+  private volatile Future lastDelayedWrite;
 
   private static int calcCompactionThreshold(int ct) {
     if (ct == DiskStoreFactory.DEFAULT_COMPACTION_THRESHOLD) {
@@ -387,19 +374,19 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   /**
-   * Creates a new <code>DiskRegion</code> that access disk on behalf of the given region.
+   * Creates a new {@code DiskRegion} that access disk on behalf of the given region.
    */
-  DiskStoreImpl(Cache cache, DiskStoreAttributes props) {
+  DiskStoreImpl(InternalCache cache, DiskStoreAttributes props) {
     this(cache, props, false, null);
   }
 
-  DiskStoreImpl(Cache cache, DiskStoreAttributes props, boolean ownedByRegion,
+  DiskStoreImpl(InternalCache cache, DiskStoreAttributes props, boolean ownedByRegion,
       InternalRegionArguments internalRegionArgs) {
     this(cache, props.getName(), props, ownedByRegion, internalRegionArgs, false,
         false/* upgradeVersionOnly */, false, false, true, false/* offlineModify */);
   }
 
-  DiskStoreImpl(Cache cache, String name, DiskStoreAttributes props, boolean ownedByRegion,
+  DiskStoreImpl(InternalCache cache, String name, DiskStoreAttributes props, boolean ownedByRegion,
       InternalRegionArguments internalRegionArgs, boolean offline, boolean upgradeVersionOnly,
       boolean offlineValidating, boolean offlineCompacting, boolean needsOplogs,
       boolean offlineModify) {
@@ -427,7 +414,7 @@ public class DiskStoreImpl implements DiskStore {
     this.warningPercent = props.getDiskUsageWarningPercentage();
     this.criticalPercent = props.getDiskUsageCriticalPercentage();
 
-    this.cache = (GemFireCacheImpl) cache;
+    this.cache = cache;
     StatisticsFactory factory = cache.getDistributedSystem();
     this.stats = new DiskStoreStats(factory, getName());
 
@@ -474,7 +461,7 @@ public class DiskStoreImpl implements DiskStore {
     this.maxDirSize = tempMaxDirSize * 1024 * 1024;
     this.infoFileDirIndex = 0;
     // Now that we no longer have db files, use all directories for oplogs
-    /**
+    /*
      * The infoFileDir contains the lock file and the init file. It will be directories[0] on a
      * brand new disk store. On an existing disk store it will be the directory the init file is
      * found in.
@@ -495,7 +482,7 @@ public class DiskStoreImpl implements DiskStore {
 
     int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS;
     final ThreadGroup compactThreadGroup =
-        LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", this.logger);
+        LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger);
     final ThreadFactory compactThreadFactory =
         GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
     this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 10, TimeUnit.SECONDS,
@@ -504,7 +491,7 @@ public class DiskStoreImpl implements DiskStore {
 
 
     final ThreadGroup deleteThreadGroup =
-        LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", this.logger);
+        LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", logger);
 
     final ThreadFactory deleteThreadFactory =
         GemfireCacheHelper.CreateThreadFactory(deleteThreadGroup, "Oplog Delete Task");
@@ -583,7 +570,7 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   /**
-   * Returns the <code>DiskStoreStats</code> for this store
+   * Returns the {@code DiskStoreStats} for this store
    */
   public DiskStoreStats getStats() {
     return this.stats;
@@ -697,7 +684,7 @@ public class DiskStoreImpl implements DiskStore {
    * @param entry The entry which is going to be written to disk
    * @throws RegionClearedException If a clear operation completed before the put operation
    *         completed successfully, resulting in the put operation to abort.
-   * @throws IllegalArgumentException If <code>id</code> is less than zero
+   * @throws IllegalArgumentException If {@code id} is less than zero
    */
   final void put(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async)
       throws RegionClearedException {
@@ -886,7 +873,6 @@ public class DiskStoreImpl implements DiskStore {
    * Given a BytesAndBits object convert it to the relevant Object (deserialize if necessary) and
    * return the object
    * 
-   * @param bb
    * @return the converted object
    */
   static Object convertBytesAndBitsIntoObject(BytesAndBits bb) {
@@ -909,7 +895,6 @@ public class DiskStoreImpl implements DiskStore {
   /**
    * Given a BytesAndBits object get the serialized blob
    * 
-   * @param bb
    * @return the converted object
    */
   static Object convertBytesAndBitsToSerializedForm(BytesAndBits bb) {
@@ -1029,7 +1014,7 @@ public class DiskStoreImpl implements DiskStore {
    * HTree with the oplog being destroyed
    * 
    * @return null if entry has nothing stored on disk (id == INVALID_ID)
-   * @throws IllegalArgumentException If <code>id</code> is less than zero, no action is taken.
+   * @throws IllegalArgumentException If {@code id} is less than zero, no action is taken.
    */
   public final Object getNoBuffer(DiskRegion dr, DiskId id) {
     BytesAndBits bb = null;
@@ -1067,8 +1052,8 @@ public class DiskStoreImpl implements DiskStore {
    * 
    * @throws RegionClearedException If a clear operation completed before the put operation
    *         completed successfully, resulting in the put operation to abort.
-   * @throws IllegalArgumentException If <code>id</code> is {@linkplain #INVALID_ID invalid}or is
-   *         less than zero, no action is taken.
+   * @throws IllegalArgumentException If {@code id} is {@linkplain #INVALID_ID invalid}or is less
+   *         than zero, no action is taken.
    */
   final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear)
       throws RegionClearedException {
@@ -1191,7 +1176,7 @@ public class DiskStoreImpl implements DiskStore {
     if (currentOpsInProgress == 0) {
       synchronized (this.closeRegionGuard) {
         if (dr.isRegionClosed() && entryOpsCount.get() == 0) {
-          this.closeRegionGuard.notify();
+          this.closeRegionGuard.notifyAll();
         }
       }
     }
@@ -1237,7 +1222,6 @@ public class DiskStoreImpl implements DiskStore {
   /**
    * Get serialized form of data off the disk
    * 
-   * @param id
    * @since GemFire 5.7
    */
   public Object getSerializedData(DiskRegion dr, DiskId id) {
@@ -1269,7 +1253,7 @@ public class DiskStoreImpl implements DiskStore {
         DiskEntry entry = ade.de;
         DiskEntry.Helper.handleFullAsyncQueue(entry, region, tag);
       }
-    } catch (RegionDestroyedException ex) {
+    } catch (RegionDestroyedException ignore) {
       // Normally we flush before closing or destroying a region
       // but in some cases it is closed w/o flushing.
       // So just ignore it; see bug 41305.
@@ -1397,8 +1381,7 @@ public class DiskStoreImpl implements DiskStore {
   private int fillDrainList() {
     synchronized (this.drainSync) {
       this.drainList = new ArrayList(asyncQueue.size());
-      int drainCount = asyncQueue.drainTo(this.drainList);
-      return drainCount;
+      return asyncQueue.drainTo(this.drainList);
     }
   }
 
@@ -1410,8 +1393,6 @@ public class DiskStoreImpl implements DiskStore {
    * To fix bug 41770 clear the list in a way that will not break a concurrent iterator that is not
    * synced on drainSync. Only clear from it entries on the given region. Currently we do this by
    * clearing the isPendingAsync bit on each entry in this list.
-   * 
-   * @param rvv
    */
   void clearDrainList(LocalRegion r, RegionVersionVector rvv) {
     synchronized (this.drainSync) {
@@ -1516,7 +1497,7 @@ public class DiskStoreImpl implements DiskStore {
     try {
       this.flusherThread.join(waitMs);
       return true;
-    } catch (InterruptedException ie) {
+    } catch (InterruptedException ignore) {
       Thread.currentThread().interrupt();
     }
     return false;
@@ -1532,7 +1513,7 @@ public class DiskStoreImpl implements DiskStore {
     }
   }
 
-  public GemFireCacheImpl getCache() {
+  public InternalCache getCache() {
     return this.cache;
   }
 
@@ -1759,7 +1740,7 @@ public class DiskStoreImpl implements DiskStore {
                       }
                     }
                   } // else
-                } catch (RegionDestroyedException ex) {
+                } catch (RegionDestroyedException ignore) {
                   // Normally we flush before closing or destroying a region
                   // but in some cases it is closed w/o flushing.
                   // So just ignore it; see bug 41305.
@@ -2050,18 +2031,8 @@ public class DiskStoreImpl implements DiskStore {
     return this.directories[this.infoFileDirIndex];
   }
 
-  /** For Testing * */
-  // void addToOplogSet(int oplogID, File opFile, DirectoryHolder dirHolder) {
-  // Oplog oplog = new Oplog(oplogID, this);
-  // oplog.addRecoveredFile(opFile, dirHolder);
-  // // @todo check callers to see if they need drf support
-  // this.oplogSet.add(oplog);
-  // }
-
-  /** For Testing * */
   /**
    * returns the size of the biggest directory available to the region
-   * 
    */
   public long getMaxDirSize() {
     return maxDirSize;
@@ -2143,8 +2114,6 @@ public class DiskStoreImpl implements DiskStore {
 
   /**
    * Removes anything found in the async queue for the given region
-   * 
-   * @param rvv
    */
   private void clearAsyncQueue(LocalRegion region, boolean needsWriteLock,
       RegionVersionVector rvv) {
@@ -2263,7 +2232,7 @@ public class DiskStoreImpl implements DiskStore {
     if (diskException.get() != null) {
       try {
         _testHandleDiskAccessException.await();
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
       }
     }
@@ -2466,25 +2435,26 @@ public class DiskStoreImpl implements DiskStore {
               dr.setRegionClosed(true);
             }
             gotLock = true;
-          } catch (CancelException e) {
+          } catch (CancelException ignore) {
             synchronized (this.closeRegionGuard) {
               if (!dr.isRegionClosed()) {
                 if (!closeDataOnly) {
                   dr.setRegionClosed(true);
                 }
-                // Asif: I am quite sure that it should also be Ok if instead
+                // I am quite sure that it should also be Ok if instead
                 // while it is a If Check below. Because if acquireReadLock
                 // thread
-                // has acquired thelock, it is bound to see the isRegionClose as
+                // has acquired the lock, it is bound to see the isRegionClose as
                 // true
-                // and so will realse teh lock causing decrement to zeo , before
+                // and so will release the lock causing decrement to zero , before
                 // releasing the closeRegionGuard. But still...not to take any
                 // chance
 
                 while (this.entryOpsCount.get() > 0) {
                   try {
+                    // TODO: calling wait while holding two locks
                     this.closeRegionGuard.wait(20000);
-                  } catch (InterruptedException ie) {
+                  } catch (InterruptedException ignored) {
                     // Exit without closing the region, do not know what else
                     // can be done
                     Thread.currentThread().interrupt();
@@ -2534,8 +2504,6 @@ public class DiskStoreImpl implements DiskStore {
   /**
    * stops the compactor outside the write lock. Once stopped then it proceeds to destroy the
    * current & old oplogs
-   * 
-   * @param dr
    */
   void beginDestroyRegion(LocalRegion region, DiskRegion dr) {
     if (dr.isBackup()) {
@@ -2571,7 +2539,7 @@ public class DiskStoreImpl implements DiskStore {
           while (this.backgroundTasks.get() > 0) {
             try {
               this.backgroundTasks.wait(500L);
-            } catch (InterruptedException ex) {
+            } catch (InterruptedException ignore) {
               interrupted = true;
             }
           }
@@ -2720,7 +2688,7 @@ public class DiskStoreImpl implements DiskStore {
       return null;
     }
 
-    return l.toArray(new CompactableOplog[0]);
+    return l.toArray(new CompactableOplog[l.size()]);
   }
 
   /**
@@ -2745,7 +2713,6 @@ public class DiskStoreImpl implements DiskStore {
    * @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in
    *        the restore script.
    * @return an array of Oplogs to be copied for an incremental backup.
-   * @throws IOException
    */
   private Oplog[] filterBaselineOplogs(BackupInspector baselineInspector,
       Map<File, File> baselineCopyMap) throws IOException {
@@ -2796,11 +2763,9 @@ public class DiskStoreImpl implements DiskStore {
     }
 
     // Convert the filtered oplog list to an array
-    return oplogList.toArray(new Oplog[] {});
+    return oplogList.toArray(new Oplog[oplogList.size()]);
   }
 
-
-
   /**
    * Get all of the oplogs
    */
@@ -3013,7 +2978,7 @@ public class DiskStoreImpl implements DiskStore {
       while (this.scheduled) {
         try {
           wait();
-        } catch (InterruptedException ex) {
+        } catch (InterruptedException ignore) {
           Thread.currentThread().interrupt();
         }
       }
@@ -3114,30 +3079,13 @@ public class DiskStoreImpl implements DiskStore {
         if (dr.isRegionClosed()) {
           return;
         }
-        // // Stop the compactor if running, without taking lock.
-        // if (this.oplogCompactor != null) {
-        // try {
-        // this.oplogCompactor.stopCompactor();
-        // }
-        // catch (CancelException ignore) {
-        // // Asif:To fix Bug 39380 , ignore the cache closed exception here.
-        // // allow it to call super .close so that it would be able to close
-        // the
-        // // oplogs
-        // // Though I do not think this exception will be thrown by
-        // // the stopCompactor. Still not taking chance and ignoring it
-
-        // }
-        // }
-        // // if (!isSync()) {
-        // stopAsyncFlusher(true); // do this before writeLock
-        // // }
+
         boolean gotLock = false;
         try {
           try {
             acquireWriteLock(dr);
             gotLock = true;
-          } catch (CancelException e) {
+          } catch (CancelException ignore) {
             // see workaround below.
           }
 
@@ -3163,8 +3111,9 @@ public class DiskStoreImpl implements DiskStore {
                 }
                 boolean interrupted = Thread.interrupted();
                 try {
+                  // TODO: calling wait while holding two locks
                   this.closeRegionGuard.wait(1000);
-                } catch (InterruptedException ie) {
+                } catch (InterruptedException ignore) {
                   interrupted = true;
                 } finally {
                   if (interrupted) {
@@ -3175,7 +3124,7 @@ public class DiskStoreImpl implements DiskStore {
               if (this.entryOpsCount.get() > 0) {
                 logger.warn(LocalizedMessage.create(
                     LocalizedStrings.DisKRegion_OUTSTANDING_OPS_REMAIN_AFTER_0_SECONDS_FOR_DISK_REGION_1,
-                    new Object[] {Integer.valueOf(loopCount), dr.getName()}));
+                    new Object[] {loopCount, dr.getName()}));
 
                 for (;;) {
                   if (this.entryOpsCount.get() == 0) {
@@ -3183,8 +3132,9 @@ public class DiskStoreImpl implements DiskStore {
                   }
                   boolean interrupted = Thread.interrupted();
                   try {
+                    // TODO: calling wait while holding two locks
                     this.closeRegionGuard.wait(1000);
-                  } catch (InterruptedException ie) {
+                  } catch (InterruptedException ignore) {
                     interrupted = true;
                   } finally {
                     if (interrupted) {
@@ -3233,7 +3183,7 @@ public class DiskStoreImpl implements DiskStore {
       dr.resetRVV();
       dr.setRVVTrusted(false);
       dr.writeRVV(null, null); // just persist the empty rvv with trust=false
-    } catch (RegionDestroyedException rde) {
+    } catch (RegionDestroyedException ignore) {
       // ignore a RegionDestroyedException at this stage
     }
     if (this.initFile != null && dr.isBackup()) {
@@ -4111,11 +4061,6 @@ public class DiskStoreImpl implements DiskStore {
    * Start the backup process. This is the second step of the backup process. In this method, we
    * define the data we're backing up by copying the init file and rolling to the next file. After
    * this method returns operations can proceed as normal, except that we don't remove oplogs.
-   * 
-   * @param targetDir
-   * @param baselineInspector
-   * @param restoreScript
-   * @throws IOException
    */
   public void startBackup(File targetDir, BackupInspector baselineInspector,
       RestoreScript restoreScript) throws IOException {
@@ -4130,7 +4075,7 @@ public class DiskStoreImpl implements DiskStore {
         }
 
         // Get an appropriate lock object for each set of oplogs.
-        Object childLock = childOplog.lock;;
+        Object childLock = childOplog.lock;
 
         // TODO - We really should move this lock into the disk store, but
         // until then we need to do this magic to make sure we're actually
@@ -4201,9 +4146,6 @@ public class DiskStoreImpl implements DiskStore {
   /**
    * Copy the oplogs to the backup directory. This is the final step of the backup process. The
    * oplogs we copy are defined in the startBackup method.
-   * 
-   * @param backupManager
-   * @throws IOException
    */
   public void finishBackup(BackupManager backupManager) throws IOException {
     if (diskStoreBackup == null) {
@@ -4312,17 +4254,17 @@ public class DiskStoreImpl implements DiskStore {
     props.setProperty(CACHE_XML_FILE, "");
     DistributedSystem ds = DistributedSystem.connect(props);
     offlineDS = ds;
-    Cache c = org.apache.geode.cache.CacheFactory.create(ds);
-    offlineCache = c;
-    org.apache.geode.cache.DiskStoreFactory dsf = c.createDiskStoreFactory();
+    InternalCache cache = (InternalCache) CacheFactory.create(ds);
+    offlineCache = cache;
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
     dsf.setDiskDirs(dsDirs);
     if (offlineCompacting && maxOplogSize != -1L) {
       dsf.setMaxOplogSize(maxOplogSize);
     }
-    DiskStoreImpl dsi = new DiskStoreImpl(c, dsName,
+    DiskStoreImpl dsi = new DiskStoreImpl(cache, dsName,
         ((DiskStoreFactoryImpl) dsf).getDiskStoreAttributes(), false, null, true,
         upgradeVersionOnly, offlineValidate, offlineCompacting, needsOplogs, offlineModify);
-    ((GemFireCacheImpl) c).addDiskStore(dsi);
+    cache.addDiskStore(dsi);
     return dsi;
   }
 
@@ -4536,7 +4478,7 @@ public class DiskStoreImpl implements DiskStore {
       while (!isClosing() && currentAsyncValueRecoveryMap.containsKey(diskRegion.getId())) {
         try {
           currentAsyncValueRecoveryMap.wait();
-        } catch (InterruptedException e) {
+        } catch (InterruptedException ignore) {
           interrupted = true;
         }
       }
@@ -4591,9 +4533,9 @@ public class DiskStoreImpl implements DiskStore {
     if (lastWriteTask != null) {
       try {
         lastWriteTask.get();
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
-      } catch (Exception e) {
+      } catch (Exception ignore) {
         // do nothing, an exception from the write task was already logged.
       }
     }
@@ -4684,7 +4626,7 @@ public class DiskStoreImpl implements DiskStore {
     delayedWritePool.shutdown();
     try {
       delayedWritePool.awaitTermination(1, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
+    } catch (InterruptedException ignore) {
       Thread.currentThread().interrupt();
     }
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
index 551f733..ac72361 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
@@ -14,6 +14,19 @@
  */
 package org.apache.geode.internal.cache;
 
+import java.io.File;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -22,25 +35,16 @@ import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.*;
 
 public class DiskStoreMonitor {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean DISABLE_MONITOR =
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_DISABLE_MONITORING");
-  // private static final boolean AUTO_RECONNECT =
-  // Boolean.getBoolean("gemfire.DISK_USAGE_ENABLE_AUTO_RECONNECT");
 
   private static final int USAGE_CHECK_INTERVAL = Integer
       .getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_POLLING_INTERVAL_MILLIS", 10000);
+
   private static final float LOG_WARNING_THRESHOLD_PCT =
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_LOG_WARNING_PERCENT", 99);
 
@@ -67,7 +71,7 @@ public class DiskStoreMonitor {
     if (val < 0 || val > 100) {
       throw new IllegalArgumentException(
           LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_WARNING_INVALID_0
-              .toLocalizedString(Float.valueOf(val)));
+              .toLocalizedString(val));
     }
   }
 
@@ -80,17 +84,15 @@ public class DiskStoreMonitor {
     if (val < 0 || val > 100) {
       throw new IllegalArgumentException(
           LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_CRITICAL_INVALID_0
-              .toLocalizedString(Float.valueOf(val)));
+              .toLocalizedString(val));
     }
   }
 
   private final ScheduledExecutorService exec;
 
   private final Map<DiskStoreImpl, Set<DirectoryHolderUsage>> disks;
-  private final LogUsage logDisk;
 
-  // // this is set when we go into auto_reconnect mode
-  // private volatile DirectoryHolderUsage criticalDisk;
+  private final LogUsage logDisk;
 
   volatile DiskStateAction _testAction;
 
@@ -209,9 +211,9 @@ public class DiskStoreMonitor {
 
   private File getLogDir() {
     File log = null;
-    GemFireCacheImpl gci = GemFireCacheImpl.getInstance();
-    if (gci != null) {
-      InternalDistributedSystem ds = gci.getInternalDistributedSystem();
+    InternalCache internalCache = GemFireCacheImpl.getInstance();
+    if (internalCache != null) {
+      InternalDistributedSystem ds = internalCache.getInternalDistributedSystem();
       if (ds != null) {
         DistributionConfig conf = ds.getConfig();
         if (conf != null) {
@@ -230,7 +232,7 @@ public class DiskStoreMonitor {
     return log;
   }
 
-  abstract class DiskUsage {
+  abstract static class DiskUsage {
     private DiskState state;
 
     DiskUsage() {
@@ -305,7 +307,7 @@ public class DiskStoreMonitor {
     protected abstract void handleStateChange(DiskState next, String pct);
   }
 
-  class LogUsage extends DiskUsage {
+  static class LogUsage extends DiskUsage {
     private final File dir;
 
     public LogUsage(File dir) {
@@ -382,41 +384,12 @@ public class DiskStoreMonitor {
           logger.error(LogMarker.DISK_STORE_MONITOR,
               LocalizedMessage.create(LocalizedStrings.DiskStoreMonitor_DISK_CRITICAL, args));
 
-          try {
-            // // prepare for restart
-            // if (AUTO_RECONNECT) {
-            // disk.getCache().saveCacheXmlForReconnect();
-            // criticalDisk = this;
-            // }
-          } finally {
-            // pull the plug
-            disk.handleDiskAccessException(new DiskAccessException(msg, disk));
-          }
+          // TODO: this is weird...
+          disk.handleDiskAccessException(new DiskAccessException(msg, disk));
           break;
       }
     }
 
-    // private void performReconnect(String msg) {
-    // try {
-    // // don't try to reconnect before the cache is closed
-    // disk._testHandleDiskAccessException.await();
-    //
-    // // now reconnect, clear out the var first so a close can interrupt the
-    // // reconnect
-    // criticalDisk = null;
-    // boolean restart = disk.getCache().getDistributedSystem().tryReconnect(true, msg,
-    // disk.getCache());
-    // if (LogMarker.DISK_STORE_MONITOR || logger.isDebugEnabled()) {
-    // String pre = restart ? "Successfully" : "Unsuccessfully";
-    // logger.info(LocalizedStrings.DEBUG, pre + " attempted to restart cache");
-    // }
-    // } catch (InterruptedException e) {
-    // Thread.currentThread().interrupt();
-    // } finally {
-    // close();
-    // }
-    // }
-
     @Override
     protected File dir() {
       return dir.getDir();

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index 36ad9ce..e22e1d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -48,12 +48,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 
-/**
- * 
- */
 public class DistTXCommitMessage extends TXMessage {
 
   private static final Logger logger = LogService.getLogger();
+
   protected ArrayList<ArrayList<DistTxThinEntryState>> entryStateList = null;
 
   /** for deserialization */
@@ -75,7 +73,7 @@ public class DistTXCommitMessage extends TXMessage {
       logger.debug("DistTXCommitMessage.operateOnTx: Tx {}", txId);
     }
 
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    InternalCache cache = GemFireCacheImpl.getInstance();
     TXManagerImpl txMgr = cache.getTXMgr();
     final TXStateProxy txStateProxy = txMgr.getTXState();
     TXCommitMessage cmsg = null;
@@ -256,7 +254,7 @@ public class DistTXCommitMessage extends TXMessage {
 
     @Override
     public String toString() {
-      StringBuffer sb = new StringBuffer();
+      StringBuilder sb = new StringBuilder();
       sb.append("DistTXCommitPhaseTwoReplyMessage ").append("processorid=").append(this.processorId)
           .append(" reply to sender ").append(this.getSender());
       return sb.toString();
@@ -339,7 +337,7 @@ public class DistTXCommitMessage extends TXMessage {
             (DistTxCommitExceptionCollectingException) this.exception;
         return cce.getCacheClosedMembers();
       } else {
-        return Collections.EMPTY_SET;
+        return Collections.emptySet();
       }
     }
 
@@ -349,7 +347,7 @@ public class DistTXCommitMessage extends TXMessage {
             (DistTxCommitExceptionCollectingException) this.exception;
         return cce.getRegionDestroyedMembers(regionFullPath);
       } else {
-        return Collections.EMPTY_SET;
+        return Collections.emptySet();
       }
     }
 
@@ -387,14 +385,12 @@ public class DistTXCommitMessage extends TXMessage {
     /**
      * Determine if the commit processing was incomplete, if so throw a detailed exception
      * indicating the source of the problem
-     * 
-     * @param msgMap
      */
     public void handlePotentialCommitFailure(
         HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
       if (fatalExceptions.size() > 0) {
-        StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
-            .append(".  Caused by the following exceptions: ");
+        StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+            .append(id).append(".  Caused by the following exceptions: ");
         for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
           Map.Entry me = (Map.Entry) i.next();
           DistributedMember mem = (DistributedMember) me.getKey();
@@ -428,16 +424,13 @@ public class DistTXCommitMessage extends TXMessage {
     public Set getRegionDestroyedMembers(String regionFullPath) {
       Set members = (Set) this.regionExceptions.get(regionFullPath);
       if (members == null) {
-        members = Collections.EMPTY_SET;
+        members = Collections.emptySet();
       }
       return members;
     }
 
     /**
      * Protected by (this)
-     * 
-     * @param member
-     * @param exceptions
      */
     public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
       for (Iterator iter = exceptions.iterator(); iter.hasNext();) {

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
index ffbc3ba..0f7aa72 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
@@ -53,12 +53,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 
-/**
- * 
- */
 public final class DistTXPrecommitMessage extends TXMessage {
 
   private static final Logger logger = LogService.getLogger();
+
   ArrayList<DistTxEntryEvent> secondaryTransactionalOperations;
 
   /** for deserialization */
@@ -76,7 +74,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
 
   @Override
   protected boolean operateOnTx(TXId txId, DistributionManager dm) throws RemoteOperationException {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    InternalCache cache = GemFireCacheImpl.getInstance();
     TXManagerImpl txMgr = cache.getTXMgr();
 
     if (logger.isDebugEnabled()) {
@@ -132,7 +130,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
     }
 
     // Send Response : Send false if conflict
-    DistTxPrecommitResponse finalResponse = new DistTxPrecommitResponse(precommitSuccess,
+    DistTxPreCommitResponse finalResponse = new DistTxPreCommitResponse(precommitSuccess,
         new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values()));
     DistTXPrecommitReplyMessage.send(getSender(), getProcessorId(), finalResponse,
         getReplySender(dm));
@@ -176,7 +174,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
    * This is the reply to a {@link DistTXPrecommitMessage}.
    */
   public static final class DistTXPrecommitReplyMessage extends ReplyMessage {
-    private transient DistTxPrecommitResponse commitResponse;
+    private transient DistTxPreCommitResponse commitResponse;
 
     /**
      * Empty constructor to conform to DataSerializable interface
@@ -187,7 +185,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
       fromData(in);
     }
 
-    private DistTXPrecommitReplyMessage(int processorId, DistTxPrecommitResponse val) {
+    private DistTXPrecommitReplyMessage(int processorId, DistTxPreCommitResponse val) {
       setProcessorId(processorId);
       this.commitResponse = val;
     }
@@ -209,7 +207,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
      * @param replySender distribution manager used to send the reply
      */
     public static void send(InternalDistributedMember recipient, int processorId,
-        DistTxPrecommitResponse val, ReplySender replySender) throws RemoteOperationException {
+        DistTxPreCommitResponse val, ReplySender replySender) throws RemoteOperationException {
       Assert.assertTrue(recipient != null, "DistTXPhaseOneCommitReplyMessage NULL reply message");
       DistTXPrecommitReplyMessage m = new DistTXPrecommitReplyMessage(processorId, val);
       m.setRecipient(recipient);
@@ -253,18 +251,18 @@ public final class DistTXPrecommitMessage extends TXMessage {
     @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       super.fromData(in);
-      this.commitResponse = (DistTxPrecommitResponse) DataSerializer.readObject(in);
+      this.commitResponse = (DistTxPreCommitResponse) DataSerializer.readObject(in);
     }
 
     @Override
     public String toString() {
-      StringBuffer sb = new StringBuffer();
+      StringBuilder sb = new StringBuilder();
       sb.append("DistTXPhaseOneCommitReplyMessage").append("processorid=").append(this.processorId)
           .append(" reply to sender ").append(this.getSender());
       return sb.toString();
     }
 
-    public DistTxPrecommitResponse getCommitResponse() {
+    public DistTxPreCommitResponse getCommitResponse() {
       return commitResponse;
     }
   }
@@ -279,7 +277,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
    */
   public static final class DistTxPrecommitReplyProcessor extends ReplyProcessor21 {
     private HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap;
-    private Map<DistributedMember, DistTxPrecommitResponse> commitResponseMap;
+    private Map<DistributedMember, DistTxPreCommitResponse> commitResponseMap;
     private transient TXId txIdent = null;
 
     public DistTxPrecommitReplyProcessor(TXId txUniqId, DM dm, Set initMembers,
@@ -288,7 +286,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
       this.msgMap = msgMap;
       // [DISTTX] TODO Do we need synchronised map?
       this.commitResponseMap =
-          Collections.synchronizedMap(new HashMap<DistributedMember, DistTxPrecommitResponse>());
+          Collections.synchronizedMap(new HashMap<DistributedMember, DistTxPreCommitResponse>());
       this.txIdent = txUniqId;
     }
 
@@ -340,7 +338,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
             (DistTxPrecommitExceptionCollectingException) this.exception;
         return cce.getCacheClosedMembers();
       } else {
-        return Collections.EMPTY_SET;
+        return Collections.emptySet();
       }
     }
 
@@ -350,11 +348,11 @@ public final class DistTXPrecommitMessage extends TXMessage {
             (DistTxPrecommitExceptionCollectingException) this.exception;
         return cce.getRegionDestroyedMembers(regionFullPath);
       } else {
-        return Collections.EMPTY_SET;
+        return Collections.emptySet();
       }
     }
 
-    public Map<DistributedMember, DistTxPrecommitResponse> getCommitResponseMap() {
+    public Map<DistributedMember, DistTxPreCommitResponse> getCommitResponseMap() {
       return commitResponseMap;
     }
   }
@@ -388,14 +386,12 @@ public final class DistTXPrecommitMessage extends TXMessage {
     /**
      * Determine if the commit processing was incomplete, if so throw a detailed exception
      * indicating the source of the problem
-     * 
-     * @param msgMap
      */
     public void handlePotentialCommitFailure(
         HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
       if (fatalExceptions.size() > 0) {
-        StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
-            .append(".  Caused by the following exceptions: ");
+        StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+            .append(id).append(".  Caused by the following exceptions: ");
         for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
           Map.Entry me = (Map.Entry) i.next();
           DistributedMember mem = (DistributedMember) me.getKey();
@@ -429,16 +425,13 @@ public final class DistTXPrecommitMessage extends TXMessage {
     public Set getRegionDestroyedMembers(String regionFullPath) {
       Set members = (Set) this.regionExceptions.get(regionFullPath);
       if (members == null) {
-        members = Collections.EMPTY_SET;
+        members = Collections.emptySet();
       }
       return members;
     }
 
     /**
      * Protected by (this)
-     * 
-     * @param member
-     * @param exceptions
      */
     public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
       for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
@@ -465,14 +458,14 @@ public final class DistTXPrecommitMessage extends TXMessage {
     }
   }
 
-  public static final class DistTxPrecommitResponse implements DataSerializableFixedID {
+  public static final class DistTxPreCommitResponse implements DataSerializableFixedID {
     private transient Boolean commitState;
     private transient ArrayList<ArrayList<DistTxThinEntryState>> distTxEventList;
 
     // Default constructor for serialisation
-    public DistTxPrecommitResponse() {}
+    public DistTxPreCommitResponse() {}
 
-    public DistTxPrecommitResponse(boolean precommitSuccess,
+    public DistTxPreCommitResponse(boolean precommitSuccess,
         ArrayList<ArrayList<DistTxThinEntryState>> eventList) {
       this.commitState = precommitSuccess;
       this.distTxEventList = eventList;

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
index bfe302a..d4f5943 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
@@ -75,7 +75,7 @@ public final class DistTXRollbackMessage extends TXMessage {
       logger.debug("Dist TX: Rollback: {}", txId);
     }
 
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    InternalCache cache = GemFireCacheImpl.getInstance();
     TXManagerImpl txMgr = cache.getTXMgr();
     final TXStateProxy txState = txMgr.getTXState();
     boolean rollbackSuccessful = false;
@@ -87,10 +87,6 @@ public final class DistTXRollbackMessage extends TXMessage {
               "DistTXRollbackMessage.operateOnTx: found a previously committed transaction:{}",
               txId);
         }
-        // TXCommitMessage cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        // if (txMgr.isExceptionToken(cmsg)) {
-        // throw txMgr.getExceptionForToken(cmsg, txId);
-        // }
       } else if (txState != null) {
         // [DISTTX] TODO - Handle scenarios of no txState
         // if no TXState was created (e.g. due to only getEntry/size operations
@@ -219,7 +215,7 @@ public final class DistTXRollbackMessage extends TXMessage {
 
     @Override
     public String toString() {
-      StringBuffer sb = new StringBuffer();
+      StringBuilder sb = new StringBuilder();
       sb.append("DistTXRollbackReplyMessage ").append("processorid=").append(this.processorId)
           .append(" reply to sender ").append(this.getSender());
       return sb.toString();
@@ -232,7 +228,6 @@ public final class DistTXRollbackMessage extends TXMessage {
 
   /**
    * A processor to capture the value returned by {@link DistTXRollbackReplyMessage}
-   * 
    */
   public static class DistTXRollbackResponse extends RemoteOperationResponse {
     private volatile Boolean rollbackState;
@@ -275,9 +270,6 @@ public final class DistTXRollbackMessage extends TXMessage {
         final String msg = "DistTXRollbackResponse got RemoteOperationException; rethrowing";
         logger.debug(msg, e);
         throw e;
-      } catch (TransactionDataNotColocatedException e) {
-        // Throw this up to user!
-        throw e;
       }
       return rollbackState;
     }
@@ -354,7 +346,7 @@ public final class DistTXRollbackMessage extends TXMessage {
             (DistTxRollbackExceptionCollectingException) this.exception;
         return cce.getCacheClosedMembers();
       } else {
-        return Collections.EMPTY_SET;
+        return Collections.emptySet();
       }
     }
 
@@ -364,7 +356,7 @@ public final class DistTXRollbackMessage extends TXMessage {
             (DistTxRollbackExceptionCollectingException) this.exception;
         return cce.getRegionDestroyedMembers(regionFullPath);
       } else {
-        return Collections.EMPTY_SET;
+        return Collections.emptySet();
       }
     }
 
@@ -402,14 +394,12 @@ public final class DistTXRollbackMessage extends TXMessage {
     /**
      * Determine if the commit processing was incomplete, if so throw a detailed exception
      * indicating the source of the problem
-     * 
-     * @param msgMap
      */
     public void handlePotentialCommitFailure(
         HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
       if (fatalExceptions.size() > 0) {
-        StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
-            .append(".  Caused by the following exceptions: ");
+        StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+            .append(id).append(".  Caused by the following exceptions: ");
         for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
           Map.Entry me = (Map.Entry) i.next();
           DistributedMember mem = (DistributedMember) me.getKey();
@@ -443,16 +433,13 @@ public final class DistTXRollbackMessage extends TXMessage {
     public Set getRegionDestroyedMembers(String regionFullPath) {
       Set members = (Set) this.regionExceptions.get(regionFullPath);
       if (members == null) {
-        members = Collections.EMPTY_SET;
+        members = Collections.emptySet();
       }
       return members;
     }
 
     /**
      * Protected by (this)
-     * 
-     * @param member
-     * @param exceptions
      */
     public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
       for (Iterator iter = exceptions.iterator(); iter.hasNext();) {

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 68bde4e..50f36c2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -15,10 +15,8 @@
 package org.apache.geode.internal.cache;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.Map.Entry;
@@ -32,10 +30,9 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllEntryData;
-import org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData;
+import org.apache.geode.internal.cache.DistTXPrecommitMessage.DistTxPreCommitResponse;
 import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.DistClientTXStateStub;
 import org.apache.geode.internal.cache.tx.DistTxEntryEvent;
@@ -50,8 +47,11 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    */
   protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals =
       new HashMap<>();
+
   private HashMap<LocalRegion, DistributedMember> rrTargets;
+
   private Set<DistributedMember> txRemoteParticpants = null; // other than local
+
   protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
 
   public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
@@ -132,8 +132,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    * those
    */
   private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
-    final GemFireCacheImpl cache =
-        GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
+    final InternalCache cache = GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
     InternalDistributedMember currentNode =
         cache.getInternalDistributedSystem().getDistributedMember();
 
@@ -143,7 +142,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       DistributedMember originalTarget = e.getKey();
       DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
 
-      ArrayList<DistTxEntryEvent> primaryTxOps =
+      Iterable<DistTxEntryEvent> primaryTxOps =
           distPeerTxStateStub.getPrimaryTransactionalOperations();
       for (DistTxEntryEvent dtop : primaryTxOps) {
         LocalRegion lr = dtop.getRegion();
@@ -155,8 +154,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
           allNodes.remove(originalTarget);
           otherNodes = allNodes;
         } else if (lr instanceof DistributedRegion) {
-          otherNodes =
-              ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
+          otherNodes = ((CacheDistributionAdvisee) lr).getCacheDistributionAdvisor()
+              .adviseInitializedReplicates();
           otherNodes.remove(originalTarget);
         }
 
@@ -196,7 +195,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
     }
 
     boolean finalResult = false;
-    final GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying Dist TX Rollback");
+    final InternalCache cache = GemFireCacheImpl.getExisting("Applying Dist TX Rollback");
     final DM dm = cache.getDistributionManager();
     try {
       // Create Tx Participants
@@ -319,7 +318,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       if (r instanceof PartitionedRegion) {
         target = getOwnerForKey(r, key);
       } else if (r instanceof BucketRegion) {
-        target = ((BucketRegion) r).getBucketAdvisor().getPrimary();
+        target = ((Bucket) r).getBucketAdvisor().getPrimary();
         // target = r.getMyId();
       } else { // replicated region
         target = getRRTarget(key, r);
@@ -390,7 +389,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
         throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
             .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
       }
-      target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
+      target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
       if (logger.isDebugEnabled()) {
         logger.debug(
             "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
@@ -438,7 +437,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
   private boolean doPrecommit() {
     boolean finalResult = true;
-    final GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying Dist TX Precommit");
+    final InternalCache cache = GemFireCacheImpl.getExisting("Applying Dist TX Precommit");
     final DM dm = cache.getDistributionManager();
 
     // Create Tx Participants
@@ -468,7 +467,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
         } else if (lr instanceof PartitionedRegion || lr instanceof BucketRegion) {
           final PartitionedRegion pr;
           if (lr instanceof BucketRegion) {
-            pr = ((BucketRegion) lr).getPartitionedRegion();
+            pr = ((Bucket) lr).getPartitionedRegion();
           } else {
             pr = (PartitionedRegion) lr;
           }
@@ -528,8 +527,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
           new TreeMap<String, ArrayList<DistTxThinEntryState>>();
       ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
       if (localResult) {
-        localResult = ((DistTXStateOnCoordinator) localTXState)
-            .populateDistTxEntryStateList(entryStateSortedMap);
+        localResult =
+            ((DistTXState) localTXState).populateDistTxEntryStateList(entryStateSortedMap);
         if (localResult) {
           entryEventList =
               new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
@@ -572,11 +571,11 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       // [DISTTX} TODO Handle stats
       // dm.getStats().incCommitWaits();
 
-      Map<DistributedMember, DistTxPrecommitResponse> remoteResults =
+      Map<DistributedMember, DistTxPreCommitResponse> remoteResults =
           processor.getCommitResponseMap();
-      for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults.entrySet()) {
+      for (Entry<DistributedMember, DistTxPreCommitResponse> e : remoteResults.entrySet()) {
         DistributedMember target = e.getKey();
-        DistTxPrecommitResponse remoteResponse = e.getValue();
+        DistTxPreCommitResponse remoteResponse = e.getValue();
         ArrayList<ArrayList<DistTxThinEntryState>> entryEventList =
             remoteResponse.getDistTxEntryEventList();
         populateEntryEventMap(target, entryEventList, sortedRegionName);
@@ -665,7 +664,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    */
   private boolean doCommit() {
     boolean finalResult = true;
-    final GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying Dist TX Commit");
+    final InternalCache cache = GemFireCacheImpl.getExisting("Applying Dist TX Commit");
     final DM dm = cache.getDistributionManager();
 
     // Create Tx Participants
@@ -716,7 +715,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
                 localTXState.getClass().getSimpleName()));
       }
       populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
-      ((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
+      ((DistTXState) localTXState).setDistTxEntryStates(entryEventList);
       localTXState.commit();
       TXCommitMessage localResultMsg = localTXState.getCommitMessage();
       if (logger.isDebugEnabled()) {
@@ -821,7 +820,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
         Object key = putallOp.putAllData[i].key;
         int bucketId = putallOp.putAllData[i].getBucketId();
 
-        DistributedPutAllOperation putAllForBucket = bucketToPutallMap.get(bucketId);;
+        DistributedPutAllOperation putAllForBucket = bucketToPutallMap.get(bucketId);
         if (putAllForBucket == null) {
           // TODO DISTTX: event is never released
           EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
@@ -982,7 +981,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
   public DistributedMember getOwnerForKey(LocalRegion r, KeyInfo key) {
     DistributedMember m = r.getOwnerForKey(key);
     if (m == null) {
-      GemFireCacheImpl cache = GemFireCacheImpl.getExisting("getOwnerForKey");
+      InternalCache cache = GemFireCacheImpl.getExisting("getOwnerForKey");
       m = cache.getDistributedSystem().getDistributedMember();
     }
     return m;

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0a9ccd8..7ba7d0c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import java.io.DataInput;
@@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.query.internal.cq.CqService;
@@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.CopyOnWriteHashSet;
 import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage;
 import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.Releasable;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.util.DelayedAction;
 
-/**
- * 
- */
 public abstract class DistributedCacheOperation {
 
   private static final Logger logger = LogService.getLogger();
 
   public static double LOSS_SIMULATION_RATIO = 0; // test hook
+
   public static Random LOSS_SIMULATION_GENERATOR;
 
   public static long SLOW_DISTRIBUTION_MS = 0; // test hook
 
   // constants used in subclasses and distribution messages
   // should use enum in source level 1.5+
+
   /**
    * Deserialization policy: do not deserialize (for byte array, null or cases where the value
    * should stay serialized)
@@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation {
   }
 
 
-  public final static byte DESERIALIZATION_POLICY_NUMBITS =
+  public static final byte DESERIALIZATION_POLICY_NUMBITS =
       DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
 
   public static final short DESERIALIZATION_POLICY_END =
       (short) (1 << DESERIALIZATION_POLICY_NUMBITS);
+
   public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1);
 
   public static boolean testSendingOldValues;
@@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation {
     try {
       _distribute();
     } catch (InvalidVersionException e) {
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
         logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
       }
 
@@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation {
     DistributedRegion region = getRegion();
     if (viewVersion != -1) {
       region.getDistributionAdvisor().endOperation(viewVersion);
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
         logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
             viewVersion);
       }
@@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation {
     if (SLOW_DISTRIBUTION_MS > 0) { // test hook
       try {
         Thread.sleep(SLOW_DISTRIBUTION_MS);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
       }
       SLOW_DISTRIBUTION_MS = 0;
@@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation {
       }
 
       // some members requiring old value are also in the cache op recipients set
-      Set needsOldValueInCacheOp = Collections.EMPTY_SET;
+      Set needsOldValueInCacheOp = Collections.emptySet();
 
       // set client routing information into the event
       boolean routingComputed = false;
       FilterRoutingInfo filterRouting = null;
       // recipients that will get a cacheop msg and also a PR message
-      Set twoMessages = Collections.EMPTY_SET;
+      Set twoMessages = Collections.emptySet();
       if (region.isUsedForPartitionedRegionBucket()) {
-        twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages();
+        twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
         routingComputed = true;
         filterRouting = getRecipientFilterRouting(recipients);
         if (filterRouting != null) {
@@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation {
 
       // some members need PR notification of the change for client/wan
       // notification
-      Set adjunctRecipients = Collections.EMPTY_SET;
+      Set adjunctRecipients = Collections.emptySet();
 
       // Partitioned region listener notification messages piggyback on this
       // operation's replyprocessor and need to be sent at the same time as
@@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation {
         recipients.removeAll(needsOldValueInCacheOp);
       }
 
-      Set cachelessNodes = Collections.EMPTY_SET;
-      Set adviseCacheServers = Collections.EMPTY_SET;
-      Set<InternalDistributedMember> cachelessNodesWithNoCacheServer =
-          new HashSet<InternalDistributedMember>();
+      Set cachelessNodes = Collections.emptySet();
+      Set adviseCacheServers;
+      Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>();
       if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
         cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
         if (!cachelessNodes.isEmpty()) {
           List list = new ArrayList(cachelessNodes);
           for (Object member : cachelessNodes) {
-            if (!recipients.contains(member)) {
+            if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
               // Don't include those originally excluded.
               list.remove(member);
-            } else if (adjunctRecipients.contains(member)) {
-              list.remove(member);
             }
           }
           cachelessNodes.clear();
@@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation {
         if (!reliableOp || region.isNoDistributionOk()) {
           // nothing needs be done in this case
         } else {
-          region.handleReliableDistribution(Collections.EMPTY_SET);
+          region.handleReliableDistribution(Collections.emptySet());
         }
 
-        /** compute local client routing before waiting for an ack only for a bucket */
+        // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
           this.event.setLocalFilterInfo(filterInfo);
@@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation {
       } else {
         boolean directAck = false;
         boolean useMulticast = region.getMulticastEnabled()
-            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();;
+            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
         boolean shouldAck = shouldAck();
 
         if (shouldAck) {
@@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation {
                     recipients);
               }
               waitForMembers.removeAll(recipients);
-              recipients = Collections.EMPTY_SET;
+              recipients = Collections.emptySet();
             }
           }
           if (reliableOp) {
@@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation {
           }
 
           adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients);
-          adviseCacheServers = ((BucketRegion) region).getPartitionedRegion()
+          adviseCacheServers = ((Bucket) region).getPartitionedRegion()
               .getCacheDistributionAdvisor().adviseCacheServers();
           adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
 
@@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation {
           }
         }
 
-        /** compute local client routing before waiting for an ack only for a bucket */
+        // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
           event.setLocalFilterInfo(filterInfo);
@@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-
   /**
    * Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results
    * key caching. the destroy event keys are marked as destroyed instead of removing them, this is
@@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation {
         continue;
       }
 
-      CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion()
+      CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion()
           .getCacheDistributionAdvisor().getProfile(m);
 
       if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile()
@@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation {
         continue;
       }
 
-
       for (Object value : cf.filterProfile.getCqMap().values()) {
         ServerCQ cq = (ServerCQ) value;
 
@@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation {
           Long cqID = e.getKey();
           // For the CQs satisfying the event with destroy CQEvent, remove
           // the entry form CQ cache.
-          if (cq.getFilterID() == cqID
-              && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) {
-            cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true);
+          if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
+            cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
           }
         }
       }
     }
   }
 
-
   /**
    * Get the adjunct receivers for a partitioned region operation
    * 
@@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation {
 
   /**
    * perform any operation-specific initialization on the given reply processor
-   * 
-   * @param p
-   * @param msg
    */
   protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
     // nothing to do here - see UpdateMessage
@@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-  /**
-   * @param closedMembers
-   */
   private void handleClosedMembers(Set<InternalDistributedMember> closedMembers,
       Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
     if (persistentIds == null) {
@@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation {
       return null;
     }
     CacheDistributionAdvisor advisor;
-    // if (region.isUsedForPartitionedRegionBucket()) {
-    advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
-    // } else {
-    // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor();
-    // }
+    advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
     return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
   }
 
@@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation {
     protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1);
     protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1);
 
-
     private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
 
     public boolean needsRouting;
@@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation {
       return this.op;
     }
 
-
     /** sets the concurrency versioning tag for this message */
     public void setVersionTag(VersionTag tag) {
       this.versionTag = tag;
@@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation {
     /**
      * process a reply
      * 
-     * @param reply
-     * @param processor
      * @return true if the reply-processor should continue to process this response
      */
     boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) {
@@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation {
      * @param event the entry event that contains the old value
      */
     public void appendOldValueToMessage(EntryEventImpl event) {
-      {
-        @Unretained
-        Object val = event.getRawOldValue();
-        if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
-            || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
-          return;
-        }
+      @Unretained
+      Object val = event.getRawOldValue();
+      if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
+          || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
+        return;
       }
       event.exportOldValue(this);
     }
@@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation {
 
     protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
       Assert.assertTrue(this.regionPath != null, "regionPath was null");
-      GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+      InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
       return gfc.getRegionByPathForProcessing(this.regionPath);
     }
 
@@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation {
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
         basicProcess(dm, lclRgn);
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
@@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation {
             // region
             if (!rgn.isEventTrackerInitialized()
                 && (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) {
-              if (logger.isDebugEnabled()) {
+              if (logger.isTraceEnabled()) {
                 logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event");
               }
               return;
@@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation {
           sendReply = operateOnRegion(event, dm) && sendReply;
         } finally {
           if (event instanceof EntryEventImpl) {
-            ((EntryEventImpl) event).release();
+            ((Releasable) event).release();
           }
         }
-      } catch (RegionDestroyedException e) {
+      } catch (RegionDestroyedException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Region destroyed: nothing to do", this);
         }
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
@@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation {
         if (!lclRgn.isDestroyed()) {
           logger.error("Got disk access exception, expected region to be destroyed", e);
         }
-      } catch (EntryNotFoundException e) {
+      } catch (EntryNotFoundException ignore) {
         this.appliedOperation = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Entry not found, nothing to do", this);
@@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation {
       if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871
         // distributed-no-ack message. Don't respond
       } else {
-        ReplyException exception = rex;
-        ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false,
+        ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
             isInternal());
       }
     }
@@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation {
      * When an event is discarded because of an attempt to overwrite a more recent change we still
      * need to deliver that event to clients. Clients can then perform their own concurrency checks
      * on the event.
-     * 
-     * @param rgn
-     * @param ev
      */
     protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
       if (logger.isDebugEnabled()) {
@@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation {
       rgn.notifyBridgeClients(ev);
     }
 
-    // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys,
-    // String path) {
-    // return LocalRegion.getRegionFromPath(sys, path);
-    // }
-
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
         throws EntryNotFoundException;
 
@@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-      // super.fromData(in);
       short bits = in.readShort();
       short extBits = in.readShort();
       this.flags = bits;
@@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void toData(DataOutput out) throws IOException {
-      // super.toData(out);
-
       short bits = 0;
       short extendedBits = 0;
       bits = computeCompressedShort(bits);
@@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation {
   static class CacheOperationReplyProcessor extends DirectReplyProcessor {
     public CacheOperationMessage msg;
 
-    public CopyOnWriteHashSet<InternalDistributedMember> closedMembers =
-        new CopyOnWriteHashSet<InternalDistributedMember>();
+    public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
 
     public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
       super(system, initMembers);


Mime
View raw message