helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-495] Make TestPreferenceListAsQueue non-flaky
Date Tue, 05 Aug 2014 18:15:28 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x bb15ca8f0 -> 1a4bec71f


[HELIX-495] Make TestPreferenceListAsQueue non-flaky


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1a4bec71
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1a4bec71
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1a4bec71

Branch: refs/heads/helix-0.6.x
Commit: 1a4bec71f82285b32820c14e6e68de96dd4d9372
Parents: bb15ca8
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Mon Aug 4 14:43:05 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Aug 5 11:14:38 2014 -0700

----------------------------------------------------------------------
 .../integration/TestPreferenceListAsQueue.java  | 172 +++++++++++--------
 1 file changed, 97 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1a4bec71/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 633d046..456baca 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -21,6 +21,9 @@ package org.apache.helix.integration;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
@@ -55,20 +58,23 @@ import com.google.common.collect.Lists;
 public class TestPreferenceListAsQueue extends ZkUnitTestBase {
   private static final Logger LOG = Logger.getLogger(TestPreferenceListAsQueue.class);
   private static final int TRANSITION_TIME = 500;
-  private static final int WAIT_TIME = TRANSITION_TIME + (TRANSITION_TIME / 2);
   private static final int PARALLELISM = 1;
 
   private List<String> _instanceList;
+  private Queue<List<String>> _prefListHistory;
   private String _clusterName;
   private String _stateModel;
   private ClusterSetup _clusterSetup;
   private HelixAdmin _admin;
+  private CountDownLatch _onlineLatch;
+  private CountDownLatch _offlineLatch;
 
   @BeforeMethod
   public void beforeMethod() {
     _instanceList = Lists.newLinkedList();
     _clusterSetup = new ClusterSetup(ZK_ADDR);
     _admin = _clusterSetup.getClusterManagementTool();
+    _prefListHistory = Lists.newLinkedList();
 
     // Create cluster
     String className = TestHelper.getTestClassName();
@@ -163,6 +169,9 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
         HelixManagerFactory.getZKHelixManager(_clusterName, null, InstanceType.CONTROLLER,
ZK_ADDR);
     controller.connect();
 
+    // Disable controller immediately
+    _admin.enableCluster(_clusterName, false);
+
     // This resource only has 1 partition
     String partitionName = RESOURCE_NAME + "_" + 0;
 
@@ -188,26 +197,47 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
     Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
         Arrays.asList("localhost_1", "localhost_2")));
 
-    // Now wait for the first instance to be done
-    Thread.sleep(WAIT_TIME);
-    Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
-        Arrays.asList("localhost_2", "")));
+    // Prepare for synchronization
+    _onlineLatch = new CountDownLatch(2);
+    _offlineLatch = new CountDownLatch(2);
+    _prefListHistory.clear();
+
+    // Now reenable the controller
+    _admin.enableCluster(_clusterName, true);
+
+    // Now wait for both instances to be done
+    boolean countReached = _onlineLatch.await(10000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(countReached);
+    List<String> top = _prefListHistory.poll();
+    Assert.assertTrue(top.equals(Arrays.asList("localhost_1", ""))
+        || top.equals(Arrays.asList("localhost_2", "")));
+    Assert.assertEquals(_prefListHistory.poll(), Arrays.asList("", ""));
 
-    // Add the first instance again; it should not exist
+    // Wait for everything to be fully offline
+    countReached = _offlineLatch.await(10000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(countReached);
+
+    // Add back the instances in the opposite order
+    _admin.enableCluster(_clusterName, false);
+    addInstanceToPreferences(participants[0].getHelixDataAccessor(),
+        participants[1].getInstanceName(), RESOURCE_NAME, Arrays.asList(partitionName));
     addInstanceToPreferences(participants[0].getHelixDataAccessor(),
         participants[0].getInstanceName(), RESOURCE_NAME, Arrays.asList(partitionName));
     Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
         Arrays.asList("localhost_2", "localhost_1")));
 
-    // Now wait for the second instance to be done
-    Thread.sleep(WAIT_TIME);
-    Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
-        Arrays.asList("localhost_1", "")));
-
-    // Now wait for the first instance to be done again
-    Thread.sleep(WAIT_TIME);
-    Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
-        Arrays.asList("", "")));
+    // Reset the latch
+    _onlineLatch = new CountDownLatch(2);
+    _prefListHistory.clear();
+    _admin.enableCluster(_clusterName, true);
+
+    // Now wait both to be done again
+    countReached = _onlineLatch.await(10000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(countReached);
+    top = _prefListHistory.poll();
+    Assert.assertTrue(top.equals(Arrays.asList("localhost_1", ""))
+        || top.equals(Arrays.asList("localhost_2", "")));
+    Assert.assertEquals(_prefListHistory.poll(), Arrays.asList("", ""));
     Assert.assertEquals(_instanceList.size(), 0);
 
     // Cleanup
@@ -277,37 +307,32 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
    * @param resourceName
    * @param partitionName
    */
-  private static void removeInstanceFromPreferences(HelixDataAccessor accessor,
-      final String instanceName, final String resourceName, final String partitionName) {
-    // Updater for ideal state
+  private void removeInstanceFromPreferences(HelixDataAccessor accessor, final String instanceName,
+      final String resourceName, final String partitionName) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     String idealStatePath = keyBuilder.idealStates(resourceName).getPath();
-    DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        List<String> preferenceList = currentData.getListField(partitionName);
-        int numReplicas =
-            Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
-        currentData.setListField(partitionName,
-            removeInstanceFromPreferenceList(preferenceList, instanceName, numReplicas));
-        return currentData;
-      }
-    };
-
-    // Updater for instance config
-    String instanceConfigPath = keyBuilder.instanceConfig(instanceName).getPath();
-    DataUpdater<ZNRecord> instanceConfigUpdater = new DataUpdater<ZNRecord>()
{
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        // currentData.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), false);
-        return currentData;
-      }
-    };
-    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
-    updaters.add(idealStateUpdater);
-    updaters.add(instanceConfigUpdater);
-    accessor.updateChildren(Arrays.asList(idealStatePath, instanceConfigPath), updaters,
-        AccessOption.PERSISTENT);
+    synchronized (_prefListHistory) {
+      // Updater for ideal state
+      final List<String> prefList = Lists.newLinkedList();
+      DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
+        @Override
+        public ZNRecord update(ZNRecord currentData) {
+          List<String> preferenceList = currentData.getListField(partitionName);
+          int numReplicas =
+              Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
+          List<String> newPrefList =
+              removeInstanceFromPreferenceList(preferenceList, instanceName, numReplicas);
+          currentData.setListField(partitionName, newPrefList);
+          prefList.clear();
+          prefList.addAll(newPrefList);
+          return currentData;
+        }
+      };
+      List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+      updaters.add(idealStateUpdater);
+      accessor.updateChildren(Arrays.asList(idealStatePath), updaters, AccessOption.PERSISTENT);
+      _prefListHistory.add(prefList);
+    }
   }
 
   /**
@@ -318,41 +343,36 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
    * @param resourceName
    * @param partitions
    */
-  private static void addInstanceToPreferences(HelixDataAccessor accessor,
-      final String instanceName, final String resourceName, final List<String> partitions)
{
-    // Updater for ideal state
+  private void addInstanceToPreferences(HelixDataAccessor accessor, final String instanceName,
+      final String resourceName, final List<String> partitions) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     String idealStatePath = keyBuilder.idealStates(resourceName).getPath();
-    DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        for (String partitionName : partitions) {
-          List<String> preferenceList = currentData.getListField(partitionName);
-          int numReplicas =
-              Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
-          currentData.setListField(partitionName,
-              addInstanceToPreferenceList(preferenceList, instanceName, numReplicas));
+    synchronized (_prefListHistory) {
+      // Updater for ideal state
+      final List<String> prefList = Lists.newLinkedList();
+      DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
+        @Override
+        public ZNRecord update(ZNRecord currentData) {
+          for (String partitionName : partitions) {
+            List<String> preferenceList = currentData.getListField(partitionName);
+            int numReplicas =
+                Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
+            List<String> newPrefList =
+                addInstanceToPreferenceList(preferenceList, instanceName, numReplicas);
+            currentData.setListField(partitionName, newPrefList);
+            prefList.clear();
+            prefList.addAll(newPrefList);
+          }
+          return currentData;
         }
-        return currentData;
-      }
-    };
+      };
 
-    // Updater for instance config
-    String instanceConfigPath = keyBuilder.instanceConfig(instanceName).getPath();
-    DataUpdater<ZNRecord> instanceConfigUpdater = new DataUpdater<ZNRecord>()
{
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        // currentData.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), true);
-        return currentData;
-      }
-    };
-
-    // Send update requests together
-    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
-    updaters.add(idealStateUpdater);
-    updaters.add(instanceConfigUpdater);
-    accessor.updateChildren(Arrays.asList(idealStatePath, instanceConfigPath), updaters,
-        AccessOption.PERSISTENT);
+      // Send update requests together
+      List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+      updaters.add(idealStateUpdater);
+      accessor.updateChildren(Arrays.asList(idealStatePath), updaters, AccessOption.PERSISTENT);
+      _prefListHistory.add(prefList);
+    }
   }
 
   /**
@@ -456,6 +476,7 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
         newSize = _instanceList.size();
       }
       Assert.assertEquals(newSize, oldSize); // ensure nothing came in during this time
+      _onlineLatch.countDown();
     }
 
     public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
@@ -468,6 +489,7 @@ public class TestPreferenceListAsQueue extends ZkUnitTestBase {
       HelixManager manager = context.getManager();
       LOG.info("onBecomeDroppedFromOffline for " + message.getPartitionName() + " on "
           + manager.getInstanceName());
+      _offlineLatch.countDown();
     }
 
     public void onBecomeOfflineFromError(Message message, NotificationContext context) {


Mime
View raw message