helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/2] git commit: [HELIX-132] current-state and external-view are not cleaned up when a resource has been removed, rb=21666
Date Wed, 16 Jul 2014 17:32:44 GMT
Repository: helix
Updated Branches:
  refs/heads/master a9e96ea06 -> 974f947b7


[HELIX-132] current-state and external-view are not cleaned up when a resource has been removed,
rb=21666


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b470abab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b470abab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b470abab

Branch: refs/heads/master
Commit: b470ababa8727fc5534bc8dc00b3f4a3cd9076e8
Parents: 639f2f8
Author: zzhang <zzhang@apache.org>
Authored: Wed Jul 16 10:32:06 2014 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Wed Jul 16 10:32:06 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/GroupCommit.java |  40 +++---
 .../stages/ExternalViewComputeStage.java        |   1 +
 .../stages/ResourceComputationStage.java        |   5 +
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   2 +-
 .../org/apache/helix/integration/TestDrop.java  | 124 +++++++++++++++++--
 5 files changed, 133 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b470abab/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
index 4d1bf68..f813aa9 100644
--- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -53,10 +53,6 @@ public class GroupCommit {
 
   private final Queue[] _queues = new Queue[100];
 
-  // potential memory leak if we add resource and remove resource
-  // TODO: move the cache logic to data accessor
-  // private final Map<String, ZNRecord> _cache = new ConcurrentHashMap<String,
ZNRecord>();
-
   /**
    * Set up a group committer and its associated queues
    */
@@ -81,6 +77,11 @@ public class GroupCommit {
    */
   public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
       ZNRecord record) {
+    return commit(accessor, options, key, record, false);
+  }
+
+  public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
+      ZNRecord record, boolean removeIfEmpty) {
     Queue queue = getQueue(key);
     Entry entry = new Entry(key, record);
 
@@ -98,7 +99,6 @@ public class GroupCommit {
           processed.add(first);
 
           String mergedKey = first._key;
-          // ZNRecord merged = _cache.get(mergedKey);
           ZNRecord merged = null;
 
           try {
@@ -113,24 +113,7 @@ public class GroupCommit {
            * value in ZK; use it as initial value if exists
            */
           if (merged == null) {
-            // ZNRecord valueOnZk = null;
-            // try
-            // {
-            // valueOnZk = accessor.get(mergedKey, null, 0);
-            // }
-            // catch(Exception e)
-            // {
-            // LOG.info(e);
-            // }
-            // if(valueOnZk != null)
-            // {
-            // merged = valueOnZk;
-            // merged.merge(first._record);
-            // }
-            // else // Zk path has null data. use the first record as initial record.
-            {
               merged = new ZNRecord(first._record);
-            }
           } else {
             merged.merge(first._record);
           }
@@ -145,9 +128,11 @@ public class GroupCommit {
             it.remove();
           }
           // System.out.println("size:"+ processed.size());
-          accessor.set(mergedKey, merged, options);
-          // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
-          // _cache.put(mergedKey, merged);
+          if (removeIfEmpty && merged.getMapFields().isEmpty()) {
+            accessor.remove(mergedKey, options);
+          } else {
+            accessor.set(mergedKey, merged, options);
+          }
         } finally {
           queue._running.set(null);
           for (Entry e : processed) {
@@ -162,7 +147,10 @@ public class GroupCommit {
           try {
             entry.wait(10);
           } catch (InterruptedException e) {
-            e.printStackTrace();
+            LOG.error("Interrupted while committing change, key: " + key + ", record: " +
record, e);
+
+            // Restore interrupt status
+            Thread.currentThread().interrupt();
             return false;
           }
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/b470abab/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 8c6b008..b540504 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -162,6 +162,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     // remove dead external-views
     for (String resourceName : curExtViews.keySet()) {
       if (!resourceMap.containsKey(ResourceId.from(resourceName))) {
+        LOG.info("Remove externalView for resource: " + resourceName);
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/b470abab/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index cbdf818..9f894e7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -95,6 +95,11 @@ public class ResourceComputationStage extends AbstractBaseStage {
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
         CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+        Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
+        if (resourceStateMap.isEmpty()) {
+          // skip empty current state for dropped resource
+          continue;
+        }
 
         if (currentState.getStateModelDefRef() == null) {
           LOG.error("state model def is null." + "resource:" + currentState.getResourceId()

http://git-wip-us.apache.org/repos/asf/helix/blob/b470abab/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index e411a72..38c1417 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -139,7 +139,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
     boolean success = false;
     switch (type) {
     case CURRENTSTATES:
-      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
+      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(),
true);
       break;
     case STATUSUPDATES:
       if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b470abab/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
index a81e35b..8919634 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -25,16 +25,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -46,6 +49,79 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestDrop extends ZkIntegrationTestBase {
+
+  /**
+   * Assert externalView and currentState for each participant are empty
+   * @param clusterName
+   * @param db
+   * @param participants
+   */
+  private void assertEmptyCSandEV(String clusterName, String db, MockParticipantManager[]
participants) {
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.externalView(db)));
+
+    for (MockParticipantManager participant : participants) {
+      String instanceName = participant.getInstanceName();
+      String sessionId = participant.getSessionId();
+      Assert.assertNull(accessor.getProperty(keyBuilder.currentState(instanceName, sessionId,
db)));
+    }
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Drop TestDB0
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, "TestDB0");
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    assertEmptyCSandEV(clusterName, "TestDB0", participants);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
   @Test
   public void testDropResourceWithErrorPartitionSemiAuto() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
@@ -109,6 +185,8 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    assertEmptyCSandEV(className, "TestDB0", participants);
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -183,13 +261,39 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig("localhost_12918"));
     List<String> disabledPartitions = config.getDisabledPartitions();
     // System.out.println("disabledPartitions: " + disabledPartitions);
     Assert.assertEquals(disabledPartitions.size(), 1, "TestDB0_4 should be disabled");
     Assert.assertEquals(disabledPartitions.get(0), "TestDB0_4");
 
+    // ExteranlView should have TestDB0_4->localhost_12918_>ERROR
+    ExternalView ev = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+    Set<String> partitions = ev.getPartitionSet();
+    Assert.assertEquals(partitions.size(), 1, "Should have TestDB0_4->localhost_12918->ERROR");
+    String errPartition = partitions.iterator().next();
+    Assert.assertEquals(errPartition, "TestDB0_4");
+    Map<String, String> stateMap = ev.getStateMap(errPartition);
+    Assert.assertEquals(stateMap.size(), 1);
+    Assert.assertEquals(stateMap.keySet().iterator().next(), "localhost_12918");
+    Assert.assertEquals(stateMap.get("localhost_12918"), HelixDefinedState.ERROR.name());
+
+    // localhost_12918 should have TestDB0_4 in ERROR state
+    CurrentState cs = accessor.getProperty(keyBuilder.currentState(participants[0].getInstanceName(),
+        participants[0].getSessionId(), "TestDB0"));
+    Map<String, String> partitionStateMap = cs.getPartitionStateMap();
+    Assert.assertEquals(partitionStateMap.size(), 1);
+    Assert.assertEquals(partitionStateMap.keySet().iterator().next(), "TestDB0_4");
+    Assert.assertEquals(partitionStateMap.get("TestDB0_4"), HelixDefinedState.ERROR.name());
+
+    // all other participants should have cleaned up empty current state
+    for (int i = 1; i < n; i++) {
+      String instanceName = participants[i].getInstanceName();
+      String sessionId = participants[i].getSessionId();
+      Assert.assertNull(accessor.getProperty(keyBuilder.currentState(instanceName, sessionId,
"TestDB0")));
+    }
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -232,8 +336,8 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuiler = accessor.keyBuilder();
-    accessor.setProperty(keyBuiler.idealStates("TestDB0"), isBuilder.build());
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), isBuilder.build());
 
     // start controller
     ClusterControllerManager controller =
@@ -275,6 +379,8 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result, "Should be empty exeternal-view");
 
+    assertEmptyCSandEV(clusterName, "TestDB0", participants);
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -337,7 +443,7 @@ public class TestDrop extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // drop schemata resource group
-    System.out.println("Dropping schemata resource group...");
+    // System.out.println("Dropping schemata resource group...");
     command = "--zkSvr " + ZK_ADDR + " --dropResource " + clusterName + " schemata";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
     result =
@@ -345,13 +451,7 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
-    // make sure schemata external view is empty
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
-    Builder keyBuilder = accessor.keyBuilder();
-    ExternalView extView = accessor.getProperty(keyBuilder.externalView("schemata"));
-    Assert.assertEquals(extView.getPartitionSet().size(), 0,
-        "schemata externalView should be empty but was \"" + extView + "\"");
+    assertEmptyCSandEV(clusterName, "schemata", participants);
 
     // clean up
     controller.syncStop();


Mime
View raw message