incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/3] git commit: Fixing a bug in the ZooKeeperClusterStatus where changes made to the table descriptor are not read by other processes.
Date Sat, 08 Feb 2014 22:55:47 GMT
Fixing a bug in the ZooKeeperClusterStatus where changes made to the table descriptor are not
read by other processes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/c8f1de11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/c8f1de11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/c8f1de11

Branch: refs/heads/apache-blur-0.2
Commit: c8f1de11fd908da44f1e30e64ad9f210b72e6057
Parents: ddf48ff
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Feb 8 17:55:13 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Feb 8 17:55:13 2014 -0500

----------------------------------------------------------------------
 .../clusterstatus/ZookeeperClusterStatus.java   |  77 ++------
 .../ZookeeperClusterStatusTest.java             | 192 +++++++++++--------
 2 files changed, 121 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c8f1de11/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index aaa9bd0..64ee9a0 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -46,7 +46,6 @@ import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.WatchChildren;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
 import org.apache.blur.zookeeper.WatchNodeData;
-import org.apache.blur.zookeeper.WatchNodeExistance;
 import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.blur.zookeeper.ZooKeeperLockManager;
 import org.apache.zookeeper.KeeperException;
@@ -62,11 +61,9 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   private final ZooKeeper _zk;
   private final BlurConfiguration _configuration;
   private final AtomicBoolean _running = new AtomicBoolean();
-  private final ConcurrentMap<String, Long> _safeModeMap = new ConcurrentHashMap<String,
Long>();
   private final ConcurrentMap<String, List<String>> _onlineShardsNodes = new
ConcurrentHashMap<String, List<String>>();
   private final ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String,
Set<String>>();
   private final AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new
HashSet<String>());
-  private final ConcurrentMap<String, Boolean> _enabled = new ConcurrentHashMap<String,
Boolean>();
   private final Map<String, TableDescriptor> _tableDescriptorCache = new ConcurrentHashMap<String,
TableDescriptor>();
 
   private final WatchChildren _clusterWatcher;
@@ -78,7 +75,8 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   public ZookeeperClusterStatus(ZooKeeper zooKeeper, BlurConfiguration configuration) {
     _zk = zooKeeper;
     _running.set(true);
-    _clusterWatcher = watchForClusters();
+    _clusterWatcher = new WatchChildren(_zk, ZookeeperPathConstants.getClustersPath());
+    _clusterWatcher.watch(new Clusters());
     _configuration = configuration;
     try {
       Thread.sleep(1000);
@@ -112,7 +110,8 @@ public class ZookeeperClusterStatus extends ClusterStatus {
         if (!_tableWatchers.containsKey(cluster)) {
           String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
           ZkUtils.waitUntilExists(_zk, tablesPath);
-          WatchChildren clusterWatcher = new WatchChildren(_zk, tablesPath).watch(new Tables(cluster));
+          WatchChildren clusterWatcher = new WatchChildren(_zk, tablesPath);
+          clusterWatcher.watch(new Tables(cluster));
           _tableWatchers.put(cluster, clusterWatcher);
         }
       }
@@ -130,35 +129,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }
   }
 
-  class SafeExistance extends WatchNodeExistance.OnChange {
-
-    private String cluster;
-
-    public SafeExistance(String cluster) {
-      this.cluster = cluster;
-    }
-
-    @Override
-    public void action(Stat stat) {
-      if (stat != null) {
-        WatchNodeData watchNodeData = new WatchNodeData(_zk, ZookeeperPathConstants.getSafemodePath(cluster));
-        watchNodeData.watch(new WatchNodeData.OnChange() {
-          @Override
-          public void action(byte[] data) {
-            if (data == null) {
-              LOG.debug("Safe mode value for cluster [" + cluster + "] is not set.");
-              _safeModeMap.put(cluster, Long.MIN_VALUE);
-            } else {
-              String value = new String(data);
-              LOG.debug("Safe mode value for cluster [" + cluster + "] is [" + value + "].");
-              _safeModeMap.put(cluster, Long.parseLong(value));
-            }
-          }
-        });
-      }
-    }
-  }
-
   class Tables extends OnChange {
     private String cluster;
 
@@ -183,23 +153,12 @@ public class ZookeeperClusterStatus extends ClusterStatus {
       for (String table : newTables) {
         final String clusterTableKey = getClusterTableKey(cluster, table);
         WatchNodeData enabledWatcher = new WatchNodeData(_zk, ZookeeperPathConstants.getTablePath(cluster,
table));
-        new WatchNodeData.OnChange() {
+        enabledWatcher.watch(new WatchNodeData.OnChange() {
           @Override
           public void action(byte[] data) {
-            if (data == null) {
-              _enabled.remove(clusterTableKey);
-            } else {
-              TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
-              TableDescriptor tableDescriptor = new TableDescriptor();
-              try {
-                deserializer.deserialize(tableDescriptor, data);
-              } catch (TException e) {
-                throw new RuntimeException(e);
-              }
-              _enabled.put(clusterTableKey, tableDescriptor.isEnabled());
-            }
+            _tableDescriptorCache.clear();
           }
-        };
+        });
         if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null)
{
           enabledWatcher.close();
         }
@@ -226,10 +185,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }
   }
 
-  private WatchChildren watchForClusters() {
-    return new WatchChildren(_zk, ZookeeperPathConstants.getClustersPath()).watch(new Clusters());
-  }
-
   private String getClusterTableKey(String cluster, String table) {
     return cluster + "." + table;
   }
@@ -319,13 +274,13 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   }
 
   private void watchForOnlineShardNodes(final String cluster) {
-    WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineShardsPath(cluster))
-        .watch(new OnChange() {
-          @Override
-          public void action(List<String> children) {
-            _onlineShardsNodes.put(cluster, children);
-          }
-        });
+    WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineShardsPath(cluster));
+    watch.watch(new OnChange() {
+      @Override
+      public void action(List<String> children) {
+        _onlineShardsNodes.put(cluster, children);
+      }
+    });
     if (_onlineShardsNodesWatchers.putIfAbsent(cluster, watch) != null) {
       // There was already a watch created. Close the extra watcher.
       watch.close();
@@ -531,7 +486,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
         return safeModeCacheEntry._safeMode;
       }
     }
-    long s = System.nanoTime();
     try {
       checkIfOpen();
       String safemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
@@ -546,9 +500,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isInSafeMode took [" + (e - s) / 1000000.0 + " ms]");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c8f1de11/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
index 3263b33..d5236f8 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
@@ -34,13 +34,11 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.ZooKeeperClient;
-import org.apache.zookeeper.CreateMode;
+import org.apache.blur.zookeeper.ZooKeeperLockManager;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 import org.junit.After;
@@ -56,8 +54,10 @@ public class ZookeeperClusterStatusTest {
 
   private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatusTest.class);
   private static MiniCluster miniCluster;
-  private ZooKeeper zooKeeper;
-  private ZookeeperClusterStatus clusterStatus;
+  private ZooKeeper zooKeeper1;
+  private ZooKeeper zooKeeper2;
+  private ZookeeperClusterStatus clusterStatus1;
+  private ZookeeperClusterStatus clusterStatus2;
 
   public static class QuorumPeerMainRun extends QuorumPeerMain {
     @Override
@@ -79,21 +79,32 @@ public class ZookeeperClusterStatusTest {
 
   @Before
   public void setup() throws KeeperException, InterruptedException, IOException {
-    zooKeeper = new ZooKeeperClient(miniCluster.getZkConnectionString(), 30000, new Watcher()
{
+    zooKeeper1 = new ZooKeeperClient(miniCluster.getZkConnectionString(), 30000, new Watcher()
{
       @Override
       public void process(WatchedEvent event) {
 
       }
     });
-    BlurUtil.setupZookeeper(zooKeeper, DEFAULT);
-    clusterStatus = new ZookeeperClusterStatus(zooKeeper);
+    BlurUtil.setupZookeeper(zooKeeper1, DEFAULT);
+    zooKeeper2 = new ZooKeeperClient(miniCluster.getZkConnectionString(), 30000, new Watcher()
{
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+    BlurUtil.setupZookeeper(zooKeeper1, DEFAULT);
+    BlurUtil.setupZookeeper(zooKeeper2, DEFAULT);
+    clusterStatus1 = new ZookeeperClusterStatus(zooKeeper1);
+    clusterStatus2 = new ZookeeperClusterStatus(zooKeeper2);
   }
 
   @After
   public void teardown() throws InterruptedException, KeeperException {
-    rmr(zooKeeper, "/blur");
-    clusterStatus.close();
-    zooKeeper.close();
+    clusterStatus1.close();
+    clusterStatus2.close();
+    rmr(zooKeeper1, "/blur");
+    zooKeeper1.close();
+    zooKeeper2.close();
   }
 
   private static void rmr(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException
{
@@ -107,67 +118,56 @@ public class ZookeeperClusterStatusTest {
   @Test
   public void testGetClusterList() {
     LOG.warn("testGetClusterList");
-    List<String> clusterList = clusterStatus.getClusterList(false);
+    List<String> clusterList = clusterStatus2.getClusterList(false);
     assertEquals(Arrays.asList(DEFAULT), clusterList);
   }
 
-  // @Test
-  // public void testSafeModeNotSet() throws KeeperException,
-  // InterruptedException {
-  // LOG.warn("testSafeModeNotSet");
-  // assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
-  // new WaitForAnswerToBeCorrect(20L) {
-  // @Override
-  // public Object run() {
-  // return clusterStatus.isInSafeMode(true, DEFAULT);
-  // }
-  // }.test(false);
-  // }
-  //
-  // @Test
-  // public void testSafeModeSetInPast() throws KeeperException,
-  // InterruptedException {
-  // LOG.warn("testSafeModeSetInPast");
-  // setSafeModeInPast();
-  // assertFalse(clusterStatus.isInSafeMode(false, DEFAULT));
-  // new WaitForAnswerToBeCorrect(20L) {
-  // @Override
-  // public Object run() {
-  // return clusterStatus.isInSafeMode(true, DEFAULT);
-  // }
-  // }.test(false);
-  // }
-  //
-  // @Test
-  // public void testSafeModeSetInFuture() throws KeeperException,
-  // InterruptedException {
-  // LOG.warn("testSafeModeSetInFuture");
-  // setSafeModeInFuture();
-  // assertTrue(clusterStatus.isInSafeMode(false, DEFAULT));
-  // new WaitForAnswerToBeCorrect(20L) {
-  // @Override
-  // public Object run() {
-  // return clusterStatus.isInSafeMode(true, DEFAULT);
-  // }
-  // }.test(true);
-  // }
+  @Test
+  public void testSafeModeNoCache() throws KeeperException, InterruptedException {
+    String safemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+    ZooKeeperLockManager zooKeeperLockManager = new ZooKeeperLockManager(zooKeeper1, safemodePath);
+    zooKeeperLockManager.lock(DEFAULT);
+    assertTrue(clusterStatus2.isInSafeMode(false, DEFAULT));
+    zooKeeperLockManager.unlock(DEFAULT);
+    assertFalse(clusterStatus2.isInSafeMode(false, DEFAULT));
+  }
+
+  @Test
+  public void testSafeModeCache() throws KeeperException, InterruptedException {
+    String safemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
+    ZooKeeperLockManager zooKeeperLockManager = new ZooKeeperLockManager(zooKeeper1, safemodePath);
+    zooKeeperLockManager.lock(DEFAULT);
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus2.isInSafeMode(false, DEFAULT);
+      }
+    }.test(true);
+    zooKeeperLockManager.unlock(DEFAULT);
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus2.isInSafeMode(false, DEFAULT);
+      }
+    }.test(false);
+  }
 
   @Test
   public void testGetClusterNoTable() {
     LOG.warn("testGetCluster");
-    assertNull(clusterStatus.getCluster(false, TEST));
-    assertNull(clusterStatus.getCluster(true, TEST));
+    assertNull(clusterStatus2.getCluster(false, TEST));
+    assertNull(clusterStatus2.getCluster(true, TEST));
   }
 
   @Test
   public void testGetClusterTable() throws KeeperException, InterruptedException {
     LOG.warn("testGetCluster");
     createTable(TEST);
-    assertEquals(DEFAULT, clusterStatus.getCluster(false, TEST));
+    assertEquals(DEFAULT, clusterStatus2.getCluster(false, TEST));
     new WaitForAnswerToBeCorrect(20L) {
       @Override
       public Object run() {
-        return clusterStatus.getCluster(true, TEST);
+        return clusterStatus2.getCluster(true, TEST);
       }
     }.test(DEFAULT);
   }
@@ -175,19 +175,19 @@ public class ZookeeperClusterStatusTest {
   @Test
   public void testGetTableList() throws KeeperException, InterruptedException {
     testGetClusterTable();
-    assertEquals(Arrays.asList(TEST), clusterStatus.getTableList(false));
+    assertEquals(Arrays.asList(TEST), clusterStatus2.getTableList(false));
   }
 
   @Test
   public void testIsEnabledNoTable() {
     try {
-      clusterStatus.isEnabled(false, DEFAULT, "notable");
+      clusterStatus1.isEnabled(false, DEFAULT, "notable");
       fail("should throw exception.");
     } catch (RuntimeException e) {
 
     }
     try {
-      clusterStatus.isEnabled(true, DEFAULT, "notable");
+      clusterStatus1.isEnabled(true, DEFAULT, "notable");
       fail("should throw exception.");
     } catch (RuntimeException e) {
 
@@ -197,19 +197,61 @@ public class ZookeeperClusterStatusTest {
   @Test
   public void testIsEnabledDisabledTable() throws KeeperException, InterruptedException {
     createTable("disabledtable", false);
-    assertFalse(clusterStatus.isEnabled(false, DEFAULT, "disabledtable"));
-    assertFalse(clusterStatus.isEnabled(true, DEFAULT, "disabledtable"));
+    assertFalse(clusterStatus2.isEnabled(false, DEFAULT, "disabledtable"));
+    assertFalse(clusterStatus2.isEnabled(true, DEFAULT, "disabledtable"));
   }
 
   @Test
   public void testIsEnabledEnabledTable() throws KeeperException, InterruptedException {
     createTable("enabledtable", true);
-    assertTrue(clusterStatus.isEnabled(false, DEFAULT, "enabledtable"));
+    assertTrue(clusterStatus2.isEnabled(false, DEFAULT, "enabledtable"));
+
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus2.isEnabled(true, DEFAULT, "enabledtable");
+      }
+    }.test(true);
+  }
+
+  @Test
+  public void testDisablingTableNoCache() throws KeeperException, InterruptedException {
+    createTable(TEST);
+    assertTrue(clusterStatus2.isEnabled(false, DEFAULT, TEST));
+    clusterStatus1.disableTable(DEFAULT, TEST);
+    assertFalse(clusterStatus2.isEnabled(false, DEFAULT, TEST));
+  }
+
+  @Test
+  public void testDisablingTableCache() throws KeeperException, InterruptedException {
+    createTable(TEST);
+    assertTrue(clusterStatus2.isEnabled(true, DEFAULT, TEST));
+    clusterStatus1.disableTable(DEFAULT, TEST);
+    new WaitForAnswerToBeCorrect(20L) {
+      @Override
+      public Object run() {
+        return clusterStatus2.isEnabled(true, DEFAULT, TEST);
+      }
+    }.test(false);
+  }
+
+  @Test
+  public void testEnablingTableNoCache() throws KeeperException, InterruptedException {
+    createTable(TEST, false);
+    assertFalse(clusterStatus2.isEnabled(false, DEFAULT, TEST));
+    clusterStatus1.enableTable(DEFAULT, TEST);
+    assertTrue(clusterStatus2.isEnabled(false, DEFAULT, TEST));
+  }
 
+  @Test
+  public void testEnablingTableCache() throws KeeperException, InterruptedException {
+    createTable(TEST, false);
+    assertFalse(clusterStatus2.isEnabled(true, DEFAULT, TEST));
+    clusterStatus1.enableTable(DEFAULT, TEST);
     new WaitForAnswerToBeCorrect(20L) {
       @Override
       public Object run() {
-        return clusterStatus.isEnabled(true, DEFAULT, "enabledtable");
+        return clusterStatus2.isEnabled(true, DEFAULT, TEST);
       }
     }.test(true);
   }
@@ -223,9 +265,9 @@ public class ZookeeperClusterStatusTest {
     tableDescriptor.setName(name);
     tableDescriptor.setTableUri("./target/tmp/zk_test_hdfs");
     tableDescriptor.setEnabled(enabled);
-    clusterStatus.createTable(tableDescriptor);
+    clusterStatus1.createTable(tableDescriptor);
     if (enabled) {
-      clusterStatus.enableTable(tableDescriptor.getCluster(), name);
+      clusterStatus1.enableTable(tableDescriptor.getCluster(), name);
     }
   }
 
@@ -259,24 +301,4 @@ public class ZookeeperClusterStatusTest {
     }
   }
 
-  private void setSafeModeInPast() throws KeeperException, InterruptedException {
-    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
-    Stat stat = zooKeeper.exists(blurSafemodePath, false);
-    byte[] data = Long.toString(System.currentTimeMillis() - 60000).getBytes();
-    if (stat == null) {
-      zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    }
-    zooKeeper.setData(blurSafemodePath, data, -1);
-  }
-
-  private void setSafeModeInFuture() throws KeeperException, InterruptedException {
-    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(DEFAULT);
-    Stat stat = zooKeeper.exists(blurSafemodePath, false);
-    byte[] data = Long.toString(System.currentTimeMillis() + 60000).getBytes();
-    if (stat == null) {
-      zooKeeper.create(blurSafemodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    }
-    zooKeeper.setData(blurSafemodePath, data, -1);
-  }
-
 }


Mime
View raw message