incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/6] git commit: Fixing a bunch of issues with the old way table descriptors were stored in ZK.
Date Mon, 03 Feb 2014 19:51:17 GMT
Fixing a bunch of issues with the old way table descriptors were stored in ZK.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7b7b7c6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7b7b7c6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7b7b7c6d

Branch: refs/heads/apache-blur-0.2
Commit: 7b7b7c6daa8b60c862ab7786cf687852e5bec898
Parents: 8cb5580
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Feb 3 14:15:04 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Feb 3 14:15:04 2014 -0500

----------------------------------------------------------------------
 .../clusterstatus/ZookeeperClusterStatus.java   | 237 ++++---------------
 .../clusterstatus/ZookeeperPathConstants.java   |  44 ----
 .../ZookeeperClusterStatusTest.java             |  14 +-
 3 files changed, 55 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7b7b7c6d/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index 55b26ff..aaa9bd0 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -19,7 +19,6 @@ package org.apache.blur.manager.clusterstatus;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -50,11 +49,9 @@ import org.apache.blur.zookeeper.WatchNodeData;
 import org.apache.blur.zookeeper.WatchNodeExistance;
 import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.blur.zookeeper.ZooKeeperLockManager;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
@@ -70,12 +67,13 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   private final ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String,
Set<String>>();
   private final AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new
HashSet<String>());
   private final ConcurrentMap<String, Boolean> _enabled = new ConcurrentHashMap<String,
Boolean>();
+  private final Map<String, TableDescriptor> _tableDescriptorCache = new ConcurrentHashMap<String,
TableDescriptor>();
 
   private final WatchChildren _clusterWatcher;
   private final ConcurrentMap<String, WatchChildren> _onlineShardsNodesWatchers = new
ConcurrentHashMap<String, WatchChildren>();
   private final ConcurrentMap<String, WatchChildren> _tableWatchers = new ConcurrentHashMap<String,
WatchChildren>();
   private final Map<String, SafeModeCacheEntry> _clusterToSafeMode = new ConcurrentHashMap<String,
ZookeeperClusterStatus.SafeModeCacheEntry>();
-  private final ConcurrentMap<String, WatchNodeExistance> _enabledWatchNodeExistance
= new ConcurrentHashMap<String, WatchNodeExistance>();
+  private final ConcurrentMap<String, WatchNodeData> _enabledWatchNodeExistance = new
ConcurrentHashMap<String, WatchNodeData>();
 
   public ZookeeperClusterStatus(ZooKeeper zooKeeper, BlurConfiguration configuration) {
     _zk = zooKeeper;
@@ -176,26 +174,32 @@ public class ZookeeperClusterStatus extends ClusterStatus {
       Set<String> oldTables = getOldTables(newSet, oldSet);
       for (String table : oldTables) {
         final String clusterTableKey = getClusterTableKey(cluster, table);
-        WatchNodeExistance watchNodeExistance = _enabledWatchNodeExistance.remove(clusterTableKey);
-        if (watchNodeExistance != null) {
-          watchNodeExistance.close();
+        WatchNodeData watch = _enabledWatchNodeExistance.remove(clusterTableKey);
+        if (watch != null) {
+          watch.close();
         }
         _tableDescriptorCache.remove(table);
       }
       for (String table : newTables) {
         final String clusterTableKey = getClusterTableKey(cluster, table);
-        WatchNodeExistance enabledWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableEnabledPath(
-            cluster, table));
-        enabledWatcher.watch(new WatchNodeExistance.OnChange() {
+        WatchNodeData enabledWatcher = new WatchNodeData(_zk, ZookeeperPathConstants.getTablePath(cluster,
table));
+        new WatchNodeData.OnChange() {
           @Override
-          public void action(Stat stat) {
-            if (stat == null) {
-              _enabled.put(clusterTableKey, Boolean.FALSE);
+          public void action(byte[] data) {
+            if (data == null) {
+              _enabled.remove(clusterTableKey);
             } else {
-              _enabled.put(clusterTableKey, Boolean.TRUE);
+              TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+              TableDescriptor tableDescriptor = new TableDescriptor();
+              try {
+                deserializer.deserialize(tableDescriptor, data);
+              } catch (TException e) {
+                throw new RuntimeException(e);
+              }
+              _enabled.put(clusterTableKey, tableDescriptor.isEnabled());
             }
           }
-        });
+        };
         if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null)
{
           enabledWatcher.close();
         }
@@ -355,7 +359,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
         }
       }
     }
-    long s = System.nanoTime();
     try {
       checkIfOpen();
       if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null)
{
@@ -366,45 +369,19 @@ public class ZookeeperClusterStatus extends ClusterStatus {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace exists took [" + (e - s) / 1000000.0 + " ms]");
     }
   }
 
   @Override
   public boolean isEnabled(boolean useCache, String cluster, String table) {
-    if (useCache) {
-      Boolean e = _enabled.get(getClusterTableKey(cluster, table));
-      if (e != null) {
-        return e;
-      }
-    }
-    long s = System.nanoTime();
-    String tablePathIsEnabled = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
-    try {
-      checkIfOpen();
-      if (_zk.exists(tablePathIsEnabled, false) == null) {
-        return false;
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isEnabled took [" + (e - s) / 1000000.0 + " ms]");
-    }
-    return true;
+    TableDescriptor tableDescriptor = getTableDescriptor(useCache, cluster, table);
+    return tableDescriptor.isEnabled();
   }
 
-  private Map<String, TableDescriptor> _tableDescriptorCache = new ConcurrentHashMap<String,
TableDescriptor>();
-
   @Override
   public TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table)
{
     if (useCache) {
       TableDescriptor tableDescriptor = _tableDescriptorCache.get(table);
-      updateEnabled(useCache, tableDescriptor, cluster, table);
       if (tableDescriptor != null) {
         return tableDescriptor;
       }
@@ -413,29 +390,13 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     TableDescriptor tableDescriptor = new TableDescriptor();
     try {
       checkIfOpen();
-
       String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
       byte[] bytes = getData(blurTablePath);
-
-      if (bytes == null || bytes.length == 0) {
-        /*
-         * table descriptor is stored in an older format where we manually
-         * serialized each field into a different zookeeper node so we fetch it
-         * using old code and serialize it again with thrift protocol
-         */
-        LOG.info("The schema of Table [{0}] was stored in an older format. Now converting
it to the new format", table);
-        getOldTableDescriptor(useCache, cluster, table, tableDescriptor);
-
-        BlurUtil.removeAll(_zk, blurTablePath);
-
-        // store it using thrift protocol
-        byte[] newFormatBytes = serializeTableDescriptor(tableDescriptor);
-        BlurUtil.createPath(_zk, blurTablePath, newFormatBytes);
-
-      } else {
-        TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
-        deserializer.deserialize(tableDescriptor, bytes);
+      if (bytes == null) {
+        throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] not
found.");
       }
+      TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+      deserializer.deserialize(tableDescriptor, bytes);
     } catch (TException e) {
       throw new RuntimeException(e);
     } catch (KeeperException e) {
@@ -451,78 +412,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     return tableDescriptor;
   }
 
-  private TableDescriptor getOldTableDescriptor(boolean useCache, String cluster, String
table,
-      TableDescriptor tableDescriptor) {
-    long s = System.nanoTime();
-    try {
-
-      NullPointerException npe = null;
-      LOOP: for (int i = 0; i < 10; i++) {
-        npe = null;
-        try {
-          tableDescriptor.shardCount = Integer.parseInt(new String(getData(ZookeeperPathConstants
-              .getTableShardCountPath(cluster, table))));
-          tableDescriptor.tableUri = new String(getData(ZookeeperPathConstants.getTableUriPath(cluster,
table)));
-          tableDescriptor.blockCaching = isBlockCacheEnabled(cluster, table);
-          tableDescriptor.blockCachingFileTypes = getBlockCacheFileTypes(cluster, table);
-          tableDescriptor.name = table;
-          tableDescriptor.readOnly = internalGetReadOnly(ZookeeperPathConstants.getTableReadOnlyPath(cluster,
table));
-          tableDescriptor.preCacheCols = toList(getData(ZookeeperPathConstants
-              .getTableColumnsToPreCache(cluster, table)));
-          byte[] data = getData(ZookeeperPathConstants.getTableSimilarityPath(cluster, table));
-          if (data != null) {
-            tableDescriptor.similarityClass = new String(data);
-          }
-          updateEnabled(useCache, tableDescriptor, cluster, table);
-          break LOOP;
-        } catch (NullPointerException e) {
-          npe = e;
-          LOG.warn("Terrible hack to make the table admins pick up on changes to table descriptors
while it's being created.");
-          Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-        }
-      }
-      if (npe != null) {
-        throw npe;
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getOldTableDescriptor took [" + (e - s) / 1000000.0 + " ms]");
-    }
-
-    return tableDescriptor;
-  }
-
-  private boolean internalGetReadOnly(String tableReadOnlyPath) throws KeeperException, InterruptedException
{
-    Stat stat = _zk.exists(tableReadOnlyPath, false);
-    if (stat == null) {
-      return false;
-    }
-    return true;
-  }
-
-  private static List<String> toList(byte[] bs) {
-    if (bs == null) {
-      return null;
-    }
-    String str = new String(bs);
-    String[] split = str.split(",");
-    List<String> list = new ArrayList<String>();
-    for (String s : split) {
-      list.add(s.trim());
-    }
-    return list;
-  }
-
-  private void updateEnabled(boolean useCache, TableDescriptor tableDescriptor, String cluster,
String table) {
-    if (tableDescriptor != null) {
-      tableDescriptor.setEnabled(isEnabled(useCache, cluster, table));
-    }
-  }
-
   private byte[] getData(String path) throws KeeperException, InterruptedException {
     Stat stat = _zk.exists(path, false);
     if (stat == null) {
@@ -663,50 +552,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }
   }
 
-  private Set<String> getBlockCacheFileTypes(String cluster, String table) {
-    long s = System.nanoTime();
-    try {
-      byte[] data = getData(ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster,
table));
-      if (data == null) {
-        return null;
-      }
-      String str = new String(data);
-      if (str.isEmpty()) {
-        return null;
-      }
-      Set<String> types = new HashSet<String>(Arrays.asList(str.split(",")));
-      if (types.isEmpty()) {
-        return null;
-      }
-      return types;
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getBlockCacheFileTypes took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  private boolean isBlockCacheEnabled(String cluster, String table) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table),
false) == null) {
-        return false;
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isBlockCacheEnabled took [" + (e - s) / 1000000.0 + " ms]");
-    }
-    return true;
-  }
-
   @Override
   public boolean isReadOnly(boolean useCache, String cluster, String table) {
     TableDescriptor tableDescriptor = getTableDescriptor(useCache, cluster, table);
@@ -772,26 +617,26 @@ public class ZookeeperClusterStatus extends ClusterStatus {
 
   @Override
   public void disableTable(String cluster, String table) {
-    long s = System.nanoTime();
     try {
       checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null)
{
+      String tablePath = ZookeeperPathConstants.getTablePath(cluster, table);
+      Stat stat = _zk.exists(tablePath, false);
+      if (stat == null) {
         throw new IOException("Table [" + table + "] does not exist.");
       }
-      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
-      if (_zk.exists(blurTableEnabledPath, false) == null) {
-        throw new IOException("Table [" + table + "] already disabled.");
+      TableDescriptor tableDescriptor = getTableDescriptor(false, cluster, table);
+      if (!tableDescriptor.isEnabled()) {
+        return;
       }
-      _zk.delete(blurTableEnabledPath, -1);
+      tableDescriptor.setEnabled(false);
+      byte[] bytes = serializeTableDescriptor(tableDescriptor);
+      _zk.setData(tablePath, bytes, stat.getVersion());
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } catch (KeeperException e) {
       throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace disableTable took [" + (e - s) / 1000000.0 + " ms]");
     }
   }
 
@@ -800,14 +645,18 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     long s = System.nanoTime();
     try {
       checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null)
{
+      String tablePath = ZookeeperPathConstants.getTablePath(cluster, table);
+      Stat stat = _zk.exists(tablePath, false);
+      if (stat == null) {
         throw new IOException("Table [" + table + "] does not exist.");
       }
-      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
-      if (_zk.exists(blurTableEnabledPath, false) != null) {
-        throw new IOException("Table [" + table + "] already enabled.");
+      TableDescriptor tableDescriptor = getTableDescriptor(false, cluster, table);
+      if (tableDescriptor.isEnabled()) {
+        return;
       }
-      _zk.create(blurTableEnabledPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      tableDescriptor.setEnabled(true);
+      byte[] bytes = serializeTableDescriptor(tableDescriptor);
+      _zk.setData(tablePath, bytes, stat.getVersion());
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7b7b7c6d/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
index 408bc75..b342d3e 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
@@ -39,18 +39,6 @@ public class ZookeeperPathConstants {
     return getBasePath() + "/controller-nodes";
   }
 
-  public static String getTableEnabledPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/enabled";
-  }
-
-  public static String getTableUriPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/uri";
-  }
-
-  public static String getTableShardCountPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/shard-count";
-  }
-
   public static String getOnlineShardsPath(String cluster) {
     return getClusterPath(cluster) + "/online-nodes";
   }
@@ -75,38 +63,6 @@ public class ZookeeperPathConstants {
     return getClusterPath(cluster) + "/registered-nodes";
   }
 
-  public static String getLockPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/locks";
-  }
-
-  public static String getTableBlockCachingFileTypesPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/blockcachingfiletypes";
-  }
-
-  public static String getTableBlockCachingPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/blockcaching";
-  }
-
-  public static String getTableSimilarityPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/similarity";
-  }
-
-  public static String getTableFieldNamesPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/fieldnames";
-  }
-
-  public static String getTableFieldNamesPath(String cluster, String table, String fieldName)
{
-    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
-  }
-
-  public static String getTableReadOnlyPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/readonly";
-  }
-
-  public static String getTableColumnsToPreCache(String cluster, String table) {
-    return getTablePath(cluster, table) + "/precache";
-  }
-
   public static String getShardLayoutPath(String cluster) {
     return getClusterPath(cluster) + "/layout";
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7b7b7c6d/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
index b8d0cf7..3263b33 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatusTest.java
@@ -180,8 +180,18 @@ public class ZookeeperClusterStatusTest {
 
   @Test
   public void testIsEnabledNoTable() {
-    assertFalse(clusterStatus.isEnabled(false, DEFAULT, "notable"));
-    assertFalse(clusterStatus.isEnabled(true, DEFAULT, "notable"));
+    try {
+      clusterStatus.isEnabled(false, DEFAULT, "notable");
+      fail("should throw exception.");
+    } catch (RuntimeException e) {
+
+    }
+    try {
+      clusterStatus.isEnabled(true, DEFAULT, "notable");
+      fail("should throw exception.");
+    } catch (RuntimeException e) {
+
+    }
   }
 
   @Test


Mime
View raw message