geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject geode git commit: GEODE-2811: close OffHeapEvictor when cache is closed
Date Thu, 27 Apr 2017 18:08:33 GMT
Repository: geode
Updated Branches:
  refs/heads/develop c98bc8bdb -> bee0b7d57


GEODE-2811: close OffHeapEvictor when cache is closed

Rejected executions are now ignored if shutting down.
execute now used instead of submit.
Close logic on HeapEvictor improved to prevent race conditions and NPEs.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/bee0b7d5
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/bee0b7d5
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/bee0b7d5

Branch: refs/heads/develop
Commit: bee0b7d570f62848318ffcd719efaee6e5884102
Parents: c98bc8b
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Fri Apr 21 17:09:23 2017 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Thu Apr 27 11:05:04 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/GemFireCacheImpl.java  | 30 ++++++--
 .../geode/internal/cache/RegionEvictorTask.java |  9 +--
 .../geode/internal/cache/lru/HeapEvictor.java   | 73 +++++++++++---------
 .../geode/internal/cache/EvictionTestBase.java  |  2 +-
 .../internal/cache/GemFireCacheImplTest.java    | 21 ++++++
 5 files changed, 95 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/bee0b7d5/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 978e863..de7558c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2039,6 +2039,16 @@ public class GemFireCacheImpl
     }
   }
 
+  /** Used by test to inject an evictor */
+  void setOffHeapEvictor(OffHeapEvictor evictor) {
+    this.offHeapEvictor = evictor;
+  }
+
+  /** Used by test to inject an evictor */
+  void setHeapEvictor(HeapEvictor evictor) {
+    this.heapEvictor = evictor;
+  }
+
   @Override
   public PersistentMemberManager getPersistentMemberManager() {
     return this.persistentMemberManager;
@@ -2313,10 +2323,8 @@ public class GemFireCacheImpl
           if (cms != null) {
             cms.close();
           }
-          HeapEvictor he = this.heapEvictor;
-          if (he != null) {
-            he.close();
-          }
+          closeHeapEvictor();
+          closeOffHeapEvictor();
         } catch (CancelException ignore) {
           // make sure the disk stores get closed
           closeDiskStores();
@@ -2385,6 +2393,20 @@ public class GemFireCacheImpl
 
   }
 
+  private void closeOffHeapEvictor() {
+    OffHeapEvictor evictor = this.offHeapEvictor;
+    if (evictor != null) {
+      evictor.close();
+    }
+  }
+
+  private void closeHeapEvictor() {
+    HeapEvictor evictor = this.heapEvictor;
+    if (evictor != null) {
+      evictor.close();
+    }
+  }
+
   @Override
   public boolean isReconnecting() {
     return this.system.isReconnecting();

http://git-wip-us.apache.org/repos/asf/geode/blob/bee0b7d5/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
index da6a671..a467726 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
@@ -34,7 +34,7 @@ import java.util.concurrent.Callable;
  * @since GemFire 6.0
  * 
  */
-public class RegionEvictorTask implements Callable<Object> {
+public class RegionEvictorTask implements Runnable {
 
   private static final Logger logger = LogService.getLogger();
 
@@ -85,7 +85,8 @@ public class RegionEvictorTask implements Callable<Object> {
     return this.evictor;
   }
 
-  public Object call() throws Exception {
+  @Override
+  public void run() {
     getGemFireCache().getCachePerfStats().incEvictorJobsStarted();
     long bytesEvicted = 0;
     long totalBytesEvicted = 0;
@@ -96,7 +97,7 @@ public class RegionEvictorTask implements Callable<Object> {
         synchronized (this.regionSet) {
           if (this.regionSet.isEmpty()) {
             lastTaskCompletionTime = System.currentTimeMillis();
-            return null;
+            return;
           }
           // TODO: Yogesh : try Fisher-Yates shuffle algorithm
           Iterator<LocalRegion> iter = regionSet.iterator();
@@ -111,7 +112,7 @@ public class RegionEvictorTask implements Callable<Object> {
               if (totalBytesEvicted >= bytesToEvictPerTask || !getHeapEvictor().mustEvict()
                   || this.regionSet.size() == 0) {
                 lastTaskCompletionTime = System.currentTimeMillis();
-                return null;
+                return;
               }
             } catch (RegionDestroyedException rd) {
               region.cache.getCancelCriterion().checkCancelInProgress(rd);

http://git-wip-us.apache.org/repos/asf/geode/blob/bee0b7d5/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java
index b22bb0e..707b408 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/lru/HeapEvictor.java
@@ -89,12 +89,12 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
 
   protected final Cache cache;
 
-  private final ArrayList testTaskSetSizes = new ArrayList();
+  private final ArrayList<Integer> testTaskSetSizes = new ArrayList<>();
   public volatile int testAbortAfterLoopCount = Integer.MAX_VALUE;
 
   private BlockingQueue<Runnable> poolQueue;
 
-  private AtomicBoolean isRunning = new AtomicBoolean(true);
+  private final AtomicBoolean isRunning = new AtomicBoolean(true);
 
   public HeapEvictor(Cache gemFireCache) {
     this.cache = gemFireCache;
@@ -198,12 +198,19 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
   /**
    * The task(i.e the region on which eviction needs to be performed) is assigned to the
threadpool.
    */
-  private void submitRegionEvictionTask(Callable<Object> task) {
-    evictorThreadPool.submit(task);
+  private void executeInThreadPool(Runnable task) {
+    try {
+      evictorThreadPool.execute(task);
+    } catch (RejectedExecutionException ex) {
+      // ignore rejection if evictor no longer running
+      if (isRunning()) {
+        throw ex;
+      }
+    }
   }
 
   public ThreadPoolExecutor getEvictorThreadPool() {
-    if (isRunning.get()) {
+    if (isRunning()) {
       return evictorThreadPool;
     }
     return null;
@@ -215,7 +222,7 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
    * @return sum of scheduled and running tasks
    */
   public int getRunningAndScheduledTasks() {
-    if (isRunning.get()) {
+    if (isRunning()) {
       return this.evictorThreadPool.getActiveCount() + this.evictorThreadPool.getQueue().size();
     }
     return -1;
@@ -243,35 +250,36 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
       long bytesToEvictPerTask = (long) (getTotalBytesToEvict() * percentage);
       regionsForSingleTask.add(lr);
       if (mustEvict()) {
-        submitRegionEvictionTask(
-            new RegionEvictorTask(regionsForSingleTask, this, bytesToEvictPerTask));
+        executeInThreadPool(new RegionEvictorTask(regionsForSingleTask, this, bytesToEvictPerTask));
       } else {
         break;
       }
     }
   }
 
-  private Set<Callable<Object>> createRegionEvictionTasks() {
-    Set<Callable<Object>> evictorTaskSet = new HashSet<Callable<Object>>();
-    int threadsAvailable = getEvictorThreadPool().getCorePoolSize();
+  private Set<RegionEvictorTask> createRegionEvictionTasks() {
+    ThreadPoolExecutor pool = getEvictorThreadPool();
+    if (pool == null) {
+      return Collections.emptySet();
+    }
+    int threadsAvailable = pool.getCorePoolSize();
     long bytesToEvictPerTask = getTotalBytesToEvict() / threadsAvailable;
     List<LocalRegion> allRegionList = getAllRegionList();
+    if (allRegionList.isEmpty()) {
+      return Collections.emptySet();
+    }
     // This shuffling is not required when eviction triggered for the first time
     Collections.shuffle(allRegionList);
     int allRegionSetSize = allRegionList.size();
-    if (allRegionList.isEmpty()) {
-      return evictorTaskSet;
-    }
+    Set<RegionEvictorTask> evictorTaskSet = new HashSet<>();
     if (allRegionSetSize <= threadsAvailable) {
       for (LocalRegion region : allRegionList) {
         List<LocalRegion> regionList = new ArrayList<LocalRegion>(1);
         regionList.add(region);
-        Callable<Object> task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask);
+        RegionEvictorTask task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask);
         evictorTaskSet.add(task);
       }
-      Iterator iterator = evictorTaskSet.iterator();
-      while (iterator.hasNext()) {
-        RegionEvictorTask regionEvictorTask = (RegionEvictorTask) iterator.next();
+      for (RegionEvictorTask regionEvictorTask : evictorTaskSet) {
         testTaskSetSizes.add(regionEvictorTask.getRegionList().size());
       }
       return evictorTaskSet;
@@ -295,9 +303,7 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
       regionsForSingleTask.add(itr.next());
     }
 
-    Iterator iterator = evictorTaskSet.iterator();
-    while (iterator.hasNext()) {
-      RegionEvictorTask regionEvictorTask = (RegionEvictorTask) iterator.next();
+    for (RegionEvictorTask regionEvictorTask : evictorTaskSet) {
       testTaskSetSizes.add(regionEvictorTask.getRegionList().size());
     }
     return evictorTaskSet;
@@ -327,7 +333,7 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
 
     // Do we care about eviction events and did the eviction event originate
     // in this VM ...
-    if (this.isRunning.get() && event.isLocal()) {
+    if (isRunning() && event.isLocal()) {
       if (event.getState().isEviction()) {
         final LogWriter logWriter = cache.getLogger();
 
@@ -378,8 +384,8 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
                 if (EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST) {
                   createAndSubmitWeightedRegionEvictionTasks();
                 } else {
-                  for (Callable<Object> task : createRegionEvictionTasks()) {
-                    submitRegionEvictionTask(task);
+                  for (RegionEvictorTask task : createRegionEvictionTasks()) {
+                    executeInThreadPool(task);
                   }
                 }
                 RegionEvictorTask.setLastTaskCompletionTime(System.currentTimeMillis());
@@ -408,14 +414,14 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
                 if (HeapEvictor.this.mustEvict.get()) {
                   // Submit this runnable back into the thread pool and execute
                   // another pass at eviction.
-                  HeapEvictor.this.evictorThreadPool.submit(this);
+                  executeInThreadPool(this);
                 }
               } catch (RegionDestroyedException e) {
                 // A region destroyed exception might be thrown for Region.size() when a
bucket
                 // moves due to rebalancing. retry submitting the eviction task without
                 // logging an error message. fixes bug 48162
                 if (HeapEvictor.this.mustEvict.get()) {
-                  HeapEvictor.this.evictorThreadPool.submit(this);
+                  executeInThreadPool(this);
                 }
               }
             }
@@ -423,7 +429,7 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
         };
 
         // Submit the first pass at eviction into the pool
-        this.evictorThreadPool.execute(evictionManagerTask);
+        executeInThreadPool(evictionManagerTask);
 
       } else {
         this.mustEvict.set(false);
@@ -447,12 +453,17 @@ public class HeapEvictor implements ResourceListener<MemoryEvent>
{
   }
 
   public void close() {
-    getEvictorThreadPool().shutdownNow();
-    isRunning.set(false);
+    if (isRunning.compareAndSet(true, false)) {
+      evictorThreadPool.shutdownNow();
+    }
+  }
+
+  public boolean isRunning() {
+    return isRunning.get();
   }
 
-  public ArrayList testOnlyGetSizeOfTasks() {
-    if (isRunning.get())
+  public ArrayList<Integer> testOnlyGetSizeOfTasks() {
+    if (isRunning())
       return testTaskSetSizes;
     return null;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/bee0b7d5/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java
index a0f7af5..e30636c 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EvictionTestBase.java
@@ -283,7 +283,7 @@ public class EvictionTestBase extends JUnit4CacheTestCase {
     }
   }
 
-  public ArrayList getTestTaskSetSizes() {
+  public ArrayList<Integer> getTestTaskSetSizes() {
     return getEvictor().testOnlyGetSizeOfTasks();
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/bee0b7d5/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index 6838e74..a24fc5a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -28,6 +28,8 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.SystemTimer;
+import org.apache.geode.internal.cache.lru.HeapEvictor;
+import org.apache.geode.internal.cache.lru.OffHeapEvictor;
 import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.fake.Fakes;
 import org.apache.geode.test.junit.categories.UnitTest;
@@ -62,6 +64,25 @@ public class GemFireCacheImplTest {
   }
 
   @Test
+  public void checkEvictorsClosed() {
+    InternalDistributedSystem ds = Fakes.distributedSystem();
+    CacheConfig cc = new CacheConfig();
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    SystemTimer ccpTimer = mock(SystemTimer.class);
+    HeapEvictor he = mock(HeapEvictor.class);
+    OffHeapEvictor ohe = mock(OffHeapEvictor.class);
+    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, cc, typeRegistry);
+    try {
+      gfc.setHeapEvictor(he);
+      gfc.setOffHeapEvictor(ohe);
+    } finally {
+      gfc.close();
+    }
+    verify(he, times(1)).close();
+    verify(ohe, times(1)).close();
+  }
+
+  @Test
   public void checkThatAsyncEventListenersUseAllThreadsInPool() {
     InternalDistributedSystem ds = Fakes.distributedSystem();
     CacheConfig cc = new CacheConfig();


Mime
View raw message