geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sai_boorlaga...@apache.org
Subject [geode] branch develop updated: GEODE-5252: Race in management adapter could fail to create MXBeans. (#1993)
Date Mon, 04 Jun 2018 14:30:45 GMT
This is an automated email from the ASF dual-hosted git repository.

sai_boorlagadda pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b731658  GEODE-5252: Race in management adapter could fail to create MXBeans. (#1993)
b731658 is described below

commit b73165836e765298d0cbef692737e9820021219a
Author: Sai Boorlagadda <sai.boorlagadda@gmail.com>
AuthorDate: Mon Jun 4 07:29:21 2018 -0700

    GEODE-5252: Race in management adapter could fail to create MXBeans. (#1993)
    
    Fixed a race condition which causes the creation of MBeans fail
    while handling resource lifecycle change notifications.
      * A read-write lock is added to synchronize between handling notifications
        of cache creation/removal and handling other notifications.
      * Added a test which to test the synchronization for a fixed amount of time.
---
 .../internal/beans/ManagementAdapter.java          |  54 ++---
 .../internal/beans/ManagementListener.java         | 243 +++++++++++----------
 .../internal/beans/ManagementAdapterTest.java      | 160 ++++++++++++++
 3 files changed, 318 insertions(+), 139 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 21bd6e6..732217e 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -135,7 +135,7 @@ public class ManagementAdapter {
    *
    * @param cache gemfire cache
    */
-  public void handleCacheCreation(InternalCache cache) throws ManagementException {
+  protected void handleCacheCreation(InternalCache cache) throws ManagementException {
     try {
       this.internalCache = cache;
       this.service =
@@ -193,7 +193,7 @@ public class ManagementAdapter {
   /**
    * Handles all the distributed mbean creation part when a Manager is started
    */
-  public void handleManagerStart() throws ManagementException {
+  protected void handleManagerStart() throws ManagementException {
     if (!isServiceInitialised("handleManagerStart")) {
       return;
     }
@@ -255,7 +255,7 @@ public class ManagementAdapter {
    * Handles all the clean up activities when a Manager is stopped It clears the distributed
mbeans
    * and underlying data structures
    */
-  public void handleManagerStop() throws ManagementException {
+  protected void handleManagerStop() throws ManagementException {
     if (!isServiceInitialised("handleManagerStop")) {
       return;
     }
@@ -312,7 +312,7 @@ public class ManagementAdapter {
   /**
    * Assumption is always cache and MemberMbean has been will be created first
    */
-  public void handleManagerCreation() throws ManagementException {
+  protected void handleManagerCreation() throws ManagementException {
     if (!isServiceInitialised("handleManagerCreation")) {
       return;
     }
@@ -367,7 +367,7 @@ public class ManagementAdapter {
    *
    * @param disk the disk store for which the call back is invoked
    */
-  public void handleDiskCreation(DiskStore disk) throws ManagementException {
+  protected void handleDiskCreation(DiskStore disk) throws ManagementException {
     if (!isServiceInitialised("handleDiskCreation")) {
       return;
     }
@@ -390,7 +390,7 @@ public class ManagementAdapter {
    * Handles LockService Creation
    *
    */
-  public void handleLockServiceCreation(DLockService lockService) throws ManagementException
{
+  protected void handleLockServiceCreation(DLockService lockService) throws ManagementException
{
     if (!isServiceInitialised("handleLockServiceCreation")) {
       return;
     }
@@ -422,7 +422,7 @@ public class ManagementAdapter {
    *
    * @param sender the specific gateway sender
    */
-  public void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException
{
+  protected void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException
{
     if (!isServiceInitialised("handleGatewaySenderCreation")) {
       return;
     }
@@ -447,7 +447,7 @@ public class ManagementAdapter {
    *
    * @param recv specific gateway receiver
    */
-  public void handleGatewayReceiverCreate(GatewayReceiver recv) throws ManagementException
{
+  protected void handleGatewayReceiverCreate(GatewayReceiver recv) throws ManagementException
{
     if (!isServiceInitialised("handleGatewayReceiverCreate")) {
       return;
     }
@@ -480,7 +480,7 @@ public class ManagementAdapter {
    *
    * @param recv specific gateway receiver
    */
-  public void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException
{
+  protected void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException
{
     if (!isServiceInitialised("handleGatewayReceiverDestroy")) {
       return;
     }
@@ -504,7 +504,7 @@ public class ManagementAdapter {
    *
    * @param recv specific gateway receiver
    */
-  public void handleGatewayReceiverStart(GatewayReceiver recv) throws ManagementException
{
+  protected void handleGatewayReceiverStart(GatewayReceiver recv) throws ManagementException
{
     if (!isServiceInitialised("handleGatewayReceiverStart")) {
       return;
     }
@@ -529,7 +529,7 @@ public class ManagementAdapter {
    *
    * @param recv specific gateway receiver
    */
-  public void handleGatewayReceiverStop(GatewayReceiver recv) throws ManagementException
{
+  protected void handleGatewayReceiverStop(GatewayReceiver recv) throws ManagementException
{
     if (!isServiceInitialised("handleGatewayReceiverStop")) {
       return;
     }
@@ -544,7 +544,7 @@ public class ManagementAdapter {
     memberLevelNotifEmitter.sendNotification(notification);
   }
 
-  public void handleAsyncEventQueueCreation(AsyncEventQueue queue) throws ManagementException
{
+  protected void handleAsyncEventQueueCreation(AsyncEventQueue queue) throws ManagementException
{
     if (!isServiceInitialised("handleAsyncEventQueueCreation")) {
       return;
     }
@@ -568,7 +568,7 @@ public class ManagementAdapter {
    *
    * @param queue The AsyncEventQueue being removed
    */
-  public void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException
{
+  protected void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException
{
     if (!isServiceInitialised("handleAsyncEventQueueRemoval")) {
       return;
     }
@@ -604,7 +604,7 @@ public class ManagementAdapter {
    * particular alert level
    *
    */
-  public void handleSystemNotification(AlertDetails details) {
+  protected void handleSystemNotification(AlertDetails details) {
     if (!isServiceInitialised("handleSystemNotification")) {
       return;
     }
@@ -647,7 +647,7 @@ public class ManagementAdapter {
    *
    * @param cacheServer cache server instance
    */
-  public void handleCacheServerStart(CacheServer cacheServer) {
+  protected void handleCacheServerStart(CacheServer cacheServer) {
     if (!isServiceInitialised("handleCacheServerStart")) {
       return;
     }
@@ -685,7 +685,7 @@ public class ManagementAdapter {
    *
    * @param server cache server instance
    */
-  public void handleCacheServerStop(CacheServer server) {
+  protected void handleCacheServerStop(CacheServer server) {
     if (!isServiceInitialised("handleCacheServerStop")) {
       return;
     }
@@ -718,7 +718,7 @@ public class ManagementAdapter {
    *
    * @param cache GemFire Cache instance. For now client cache is not supported
    */
-  public void handleCacheRemoval(Cache cache) throws ManagementException {
+  protected void handleCacheRemoval(Cache cache) throws ManagementException {
     if (!isServiceInitialised("handleCacheRemoval")) {
       return;
     }
@@ -793,7 +793,7 @@ public class ManagementAdapter {
    * Handles particular region destroy or close operation it will remove the corresponding
MBean
    *
    */
-  public void handleRegionRemoval(Region region) throws ManagementException {
+  protected void handleRegionRemoval(Region region) throws ManagementException {
     if (!isServiceInitialised("handleRegionRemoval")) {
       return;
     }
@@ -835,7 +835,7 @@ public class ManagementAdapter {
    * Handles DiskStore Removal
    *
    */
-  public void handleDiskRemoval(DiskStore disk) throws ManagementException {
+  protected void handleDiskRemoval(DiskStore disk) throws ManagementException {
     if (!isServiceInitialised("handleDiskRemoval")) {
       return;
     }
@@ -873,7 +873,7 @@ public class ManagementAdapter {
    *
    * @param lockService lock service instance
    */
-  public void handleLockServiceRemoval(DLockService lockService) throws ManagementException
{
+  protected void handleLockServiceRemoval(DLockService lockService) throws ManagementException
{
     if (!isServiceInitialised("handleLockServiceRemoval")) {
       return;
     }
@@ -900,7 +900,7 @@ public class ManagementAdapter {
    *
    * @param locator instance of locator which is getting started
    */
-  public void handleLocatorStart(Locator locator) throws ManagementException {
+  protected void handleLocatorStart(Locator locator) throws ManagementException {
     if (!isServiceInitialised("handleLocatorCreation")) {
       return;
     }
@@ -923,7 +923,7 @@ public class ManagementAdapter {
 
   }
 
-  public void handleGatewaySenderStart(GatewaySender sender) throws ManagementException {
+  protected void handleGatewaySenderStart(GatewaySender sender) throws ManagementException
{
     if (!isServiceInitialised("handleGatewaySenderStart")) {
       return;
     }
@@ -942,7 +942,7 @@ public class ManagementAdapter {
     memberLevelNotifEmitter.sendNotification(notification);
   }
 
-  public void handleGatewaySenderStop(GatewaySender sender) throws ManagementException {
+  protected void handleGatewaySenderStop(GatewaySender sender) throws ManagementException
{
     if (!isServiceInitialised("handleGatewaySenderStop")) {
       return;
     }
@@ -954,7 +954,7 @@ public class ManagementAdapter {
     memberLevelNotifEmitter.sendNotification(notification);
   }
 
-  public void handleGatewaySenderPaused(GatewaySender sender) throws ManagementException
{
+  protected void handleGatewaySenderPaused(GatewaySender sender) throws ManagementException
{
     if (!isServiceInitialised("handleGatewaySenderPaused")) {
       return;
     }
@@ -966,7 +966,7 @@ public class ManagementAdapter {
     memberLevelNotifEmitter.sendNotification(notification);
   }
 
-  public void handleGatewaySenderResumed(GatewaySender sender) throws ManagementException
{
+  protected void handleGatewaySenderResumed(GatewaySender sender) throws ManagementException
{
     if (!isServiceInitialised("handleGatewaySenderResumed")) {
       return;
     }
@@ -978,7 +978,7 @@ public class ManagementAdapter {
     memberLevelNotifEmitter.sendNotification(notification);
   }
 
-  public void handleGatewaySenderRemoved(GatewaySender sender) throws ManagementException
{
+  protected void handleGatewaySenderRemoved(GatewaySender sender) throws ManagementException
{
     if (!isServiceInitialised("handleGatewaySenderRemoved")) {
       return;
     }
@@ -1000,7 +1000,7 @@ public class ManagementAdapter {
     memberLevelNotifEmitter.sendNotification(notification);
   }
 
-  public void handleCacheServiceCreation(CacheService cacheService) throws ManagementException
{
+  protected void handleCacheServiceCreation(CacheService cacheService) throws ManagementException
{
     if (!isServiceInitialised("handleCacheServiceCreation")) {
       return;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index 5c2bcd7..17d1951 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.management.internal.beans;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -45,6 +47,10 @@ public class ManagementListener implements ResourceEventsListener {
 
   private LogWriterI18n logger;
 
+  // having a readwrite lock to synchronize between handling cache creation/removal vs handling
+  // other notifications
+  private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
   /**
    * Constructor
    */
@@ -100,118 +106,131 @@ public class ManagementListener implements ResourceEventsListener
{
     if (!shouldProceed(event)) {
       return;
     }
-    switch (event) {
-      case CACHE_CREATE:
-        InternalCache createdCache = (InternalCache) resource;
-        adapter.handleCacheCreation(createdCache);
-        break;
-      case CACHE_REMOVE:
-        InternalCache removedCache = (InternalCache) resource;
-        adapter.handleCacheRemoval(removedCache);
-        break;
-      case REGION_CREATE:
-        Region createdRegion = (Region) resource;
-        adapter.handleRegionCreation(createdRegion);
-        break;
-      case REGION_REMOVE:
-        Region removedRegion = (Region) resource;
-        adapter.handleRegionRemoval(removedRegion);
-        break;
-      case DISKSTORE_CREATE:
-        DiskStore createdDisk = (DiskStore) resource;
-        adapter.handleDiskCreation(createdDisk);
-        break;
-      case DISKSTORE_REMOVE:
-        DiskStore removedDisk = (DiskStore) resource;
-        adapter.handleDiskRemoval(removedDisk);
-        break;
-      case GATEWAYRECEIVER_CREATE:
-        GatewayReceiver createdRecv = (GatewayReceiver) resource;
-        adapter.handleGatewayReceiverCreate(createdRecv);
-        break;
-      case GATEWAYRECEIVER_DESTROY:
-        GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
-        adapter.handleGatewayReceiverDestroy(destroyedRecv);
-        break;
-      case GATEWAYRECEIVER_START:
-        GatewayReceiver startedRecv = (GatewayReceiver) resource;
-        adapter.handleGatewayReceiverStart(startedRecv);
-        break;
-      case GATEWAYRECEIVER_STOP:
-        GatewayReceiver stoppededRecv = (GatewayReceiver) resource;
-        adapter.handleGatewayReceiverStop(stoppededRecv);
-        break;
-      case GATEWAYSENDER_CREATE:
-        GatewaySender sender = (GatewaySender) resource;
-        adapter.handleGatewaySenderCreation(sender);
-        break;
-      case GATEWAYSENDER_START:
-        GatewaySender startedSender = (GatewaySender) resource;
-        adapter.handleGatewaySenderStart(startedSender);
-        break;
-      case GATEWAYSENDER_STOP:
-        GatewaySender stoppedSender = (GatewaySender) resource;
-        adapter.handleGatewaySenderStop(stoppedSender);
-        break;
-      case GATEWAYSENDER_PAUSE:
-        GatewaySender pausedSender = (GatewaySender) resource;
-        adapter.handleGatewaySenderPaused(pausedSender);
-        break;
-      case GATEWAYSENDER_RESUME:
-        GatewaySender resumedSender = (GatewaySender) resource;
-        adapter.handleGatewaySenderResumed(resumedSender);
-        break;
-      case GATEWAYSENDER_REMOVE:
-        GatewaySender removedSender = (GatewaySender) resource;
-        adapter.handleGatewaySenderRemoved(removedSender);
-        break;
-      case LOCKSERVICE_CREATE:
-        DLockService createdLockService = (DLockService) resource;
-        adapter.handleLockServiceCreation(createdLockService);
-        break;
-      case LOCKSERVICE_REMOVE:
-        DLockService removedLockService = (DLockService) resource;
-        adapter.handleLockServiceRemoval(removedLockService);
-        break;
-      case MANAGER_CREATE:
-        adapter.handleManagerCreation();
-        break;
-      case MANAGER_START:
-        adapter.handleManagerStart();
-        break;
-      case MANAGER_STOP:
-        adapter.handleManagerStop();
-        break;
-      case ASYNCEVENTQUEUE_CREATE:
-        AsyncEventQueue queue = (AsyncEventQueue) resource;
-        adapter.handleAsyncEventQueueCreation(queue);
-        break;
-      case ASYNCEVENTQUEUE_REMOVE:
-        AsyncEventQueue removedQueue = (AsyncEventQueue) resource;
-        adapter.handleAsyncEventQueueRemoval(removedQueue);
-        break;
-      case SYSTEM_ALERT:
-        AlertDetails details = (AlertDetails) resource;
-        adapter.handleSystemNotification(details);
-        break;
-      case CACHE_SERVER_START:
-        CacheServer startedServer = (CacheServer) resource;
-        adapter.handleCacheServerStart(startedServer);
-        break;
-      case CACHE_SERVER_STOP:
-        CacheServer stoppedServer = (CacheServer) resource;
-        adapter.handleCacheServerStop(stoppedServer);
-        break;
-      case LOCATOR_START:
-        Locator loc = (Locator) resource;
-        adapter.handleLocatorStart(loc);
-        break;
-      case CACHE_SERVICE_CREATE:
-        CacheService service = (CacheService) resource;
-        adapter.handleCacheServiceCreation(service);
-        break;
-      default:
-        break;
+    try {
+      if (event == ResourceEvent.CACHE_CREATE || event == ResourceEvent.CACHE_REMOVE) {
+        readWriteLock.writeLock().lock();
+      } else {
+        readWriteLock.readLock().lock();
+      }
+      switch (event) {
+        case CACHE_CREATE:
+          InternalCache createdCache = (InternalCache) resource;
+          adapter.handleCacheCreation(createdCache);
+          break;
+        case CACHE_REMOVE:
+          InternalCache removedCache = (InternalCache) resource;
+          adapter.handleCacheRemoval(removedCache);
+          break;
+        case REGION_CREATE:
+          Region createdRegion = (Region) resource;
+          adapter.handleRegionCreation(createdRegion);
+          break;
+        case REGION_REMOVE:
+          Region removedRegion = (Region) resource;
+          adapter.handleRegionRemoval(removedRegion);
+          break;
+        case DISKSTORE_CREATE:
+          DiskStore createdDisk = (DiskStore) resource;
+          adapter.handleDiskCreation(createdDisk);
+          break;
+        case DISKSTORE_REMOVE:
+          DiskStore removedDisk = (DiskStore) resource;
+          adapter.handleDiskRemoval(removedDisk);
+          break;
+        case GATEWAYRECEIVER_CREATE:
+          GatewayReceiver createdRecv = (GatewayReceiver) resource;
+          adapter.handleGatewayReceiverCreate(createdRecv);
+          break;
+        case GATEWAYRECEIVER_DESTROY:
+          GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
+          adapter.handleGatewayReceiverDestroy(destroyedRecv);
+          break;
+        case GATEWAYRECEIVER_START:
+          GatewayReceiver startedRecv = (GatewayReceiver) resource;
+          adapter.handleGatewayReceiverStart(startedRecv);
+          break;
+        case GATEWAYRECEIVER_STOP:
+          GatewayReceiver stoppededRecv = (GatewayReceiver) resource;
+          adapter.handleGatewayReceiverStop(stoppededRecv);
+          break;
+        case GATEWAYSENDER_CREATE:
+          GatewaySender sender = (GatewaySender) resource;
+          adapter.handleGatewaySenderCreation(sender);
+          break;
+        case GATEWAYSENDER_START:
+          GatewaySender startedSender = (GatewaySender) resource;
+          adapter.handleGatewaySenderStart(startedSender);
+          break;
+        case GATEWAYSENDER_STOP:
+          GatewaySender stoppedSender = (GatewaySender) resource;
+          adapter.handleGatewaySenderStop(stoppedSender);
+          break;
+        case GATEWAYSENDER_PAUSE:
+          GatewaySender pausedSender = (GatewaySender) resource;
+          adapter.handleGatewaySenderPaused(pausedSender);
+          break;
+        case GATEWAYSENDER_RESUME:
+          GatewaySender resumedSender = (GatewaySender) resource;
+          adapter.handleGatewaySenderResumed(resumedSender);
+          break;
+        case GATEWAYSENDER_REMOVE:
+          GatewaySender removedSender = (GatewaySender) resource;
+          adapter.handleGatewaySenderRemoved(removedSender);
+          break;
+        case LOCKSERVICE_CREATE:
+          DLockService createdLockService = (DLockService) resource;
+          adapter.handleLockServiceCreation(createdLockService);
+          break;
+        case LOCKSERVICE_REMOVE:
+          DLockService removedLockService = (DLockService) resource;
+          adapter.handleLockServiceRemoval(removedLockService);
+          break;
+        case MANAGER_CREATE:
+          adapter.handleManagerCreation();
+          break;
+        case MANAGER_START:
+          adapter.handleManagerStart();
+          break;
+        case MANAGER_STOP:
+          adapter.handleManagerStop();
+          break;
+        case ASYNCEVENTQUEUE_CREATE:
+          AsyncEventQueue queue = (AsyncEventQueue) resource;
+          adapter.handleAsyncEventQueueCreation(queue);
+          break;
+        case ASYNCEVENTQUEUE_REMOVE:
+          AsyncEventQueue removedQueue = (AsyncEventQueue) resource;
+          adapter.handleAsyncEventQueueRemoval(removedQueue);
+          break;
+        case SYSTEM_ALERT:
+          AlertDetails details = (AlertDetails) resource;
+          adapter.handleSystemNotification(details);
+          break;
+        case CACHE_SERVER_START:
+          CacheServer startedServer = (CacheServer) resource;
+          adapter.handleCacheServerStart(startedServer);
+          break;
+        case CACHE_SERVER_STOP:
+          CacheServer stoppedServer = (CacheServer) resource;
+          adapter.handleCacheServerStop(stoppedServer);
+          break;
+        case LOCATOR_START:
+          Locator loc = (Locator) resource;
+          adapter.handleLocatorStart(loc);
+          break;
+        case CACHE_SERVICE_CREATE:
+          CacheService service = (CacheService) resource;
+          adapter.handleCacheServiceCreation(service);
+          break;
+        default:
+          break;
+      }
+    } finally {
+      if (event == ResourceEvent.CACHE_CREATE || event == ResourceEvent.CACHE_REMOVE) {
+        readWriteLock.writeLock().unlock();
+      } else {
+        readWriteLock.readLock().unlock();
+      }
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
new file mode 100644
index 0000000..c44473c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Scanner;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.ResourceEvent;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.DiskStoreStats;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+@Category(IntegrationTest.class)
+public class ManagementAdapterTest {
+
+  private InternalCache cache = null;
+  private DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
+  private volatile boolean race = false;
+
+  @Rule
+  public ServerStarterRule serverRule =
+      new ServerStarterRule().withWorkingDir().withLogFile().withAutoStart();
+
+  @Before
+  public void before() {
+    cache = serverRule.getCache();
+    doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats")).when(diskStore)
+        .getStats();
+    doReturn(new File[] {}).when(diskStore).getDiskDirs();
+  }
+
+  @Test
+  public void testHandlingNotificationsConcurrently() throws InterruptedException {
+    /*
+     * Tests to see if there are any concurrency issues handling resource lifecycle events.
+     *
+     * There are three runnables with specific tasks as below:
+     * r1 - continuously send cache creation/removal notifications, thread modifying the
state
+     * r2 - continuously send disk creation/removal, thread relying on state
+     * r3 - monitors log to see if there is a null pointer due race'
+     *
+     * Test runs at most 2 seconds or until a race.
+     */
+
+    Runnable r1 = () -> {
+      while (!race) {
+        try {
+          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_REMOVE,
+              cache);
+          Thread.sleep(10);
+          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_CREATE,
+              cache);
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    Runnable r2 = () -> {
+      while (!race) {
+        try {
+          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_CREATE,
+              diskStore);
+          Thread.sleep(5);
+          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE,
+              diskStore);
+          Thread.sleep(5);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    // r3 scans server log to see if there is null pointer due to caused by cache removal.
+    Runnable r3 = () -> {
+      while (!race) {
+        try {
+          File logFile = new File(serverRule.getWorkingDir() + "/server.log");
+          Scanner scanner = new Scanner(logFile);
+          while (scanner.hasNextLine()) {
+            final String lineFromFile = scanner.nextLine();
+            if (lineFromFile.contains("java.lang.NullPointerException")) {
+              race = true;
+              break;
+            }
+          }
+        } catch (FileNotFoundException e) {
+          // ignore this exception as the temp file might have been deleted after timeout
+        }
+      }
+    };
+
+    List<Runnable> runnables = Arrays.asList(r1, r2, r3);
+
+    final int numThreads = runnables.size();
+    final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
+    final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    try {
+      final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
+      final CountDownLatch afterInitBlocker = new CountDownLatch(1);
+      final CountDownLatch allDone = new CountDownLatch(numThreads);
+      for (final Runnable submittedTestRunnable : runnables) {
+        threadPool.submit(() -> {
+          allExecutorThreadsReady.countDown();
+          try {
+            afterInitBlocker.await();
+            submittedTestRunnable.run();
+          } catch (final Throwable e) {
+            exceptions.add(e);
+          } finally {
+            allDone.countDown();
+          }
+        });
+      }
+      // wait until all threads are ready
+      allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS);
+      // start all test runners
+      afterInitBlocker.countDown();
+      // wait until all done or timeout
+      allDone.await(2, TimeUnit.SECONDS);
+    } finally {
+      threadPool.shutdownNow();
+    }
+    assertThat(exceptions).as("failed with exception(s)" + exceptions).isEmpty();
+    assertThat(race).as("is service to be null due to race").isEqualTo(false);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
sai_boorlagadda@apache.org.

Mime
View raw message