Updated Branches:
refs/heads/master 783f83455 -> 42606847f
Fixed BLUR-118
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/42606847
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/42606847
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/42606847
Branch: refs/heads/master
Commit: 42606847fa71b58c848fcc29fc6184ade092242f
Parents: 783f834
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Sep 29 11:16:22 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Sep 29 11:16:37 2013 -0400
----------------------------------------------------------------------
.../clusterstatus/ZookeeperClusterStatus.java | 79 ++++++++------------
1 file changed, 33 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/42606847/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index 7ef482d..f51f19a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -66,14 +66,12 @@ public class ZookeeperClusterStatus extends ClusterStatus {
private final ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String,
Set<String>>();
private final AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new
HashSet<String>());
private final ConcurrentMap<String, Boolean> _enabled = new ConcurrentHashMap<String,
Boolean>();
- private final ConcurrentMap<String, Boolean> _readOnly = new ConcurrentHashMap<String,
Boolean>();
private final WatchChildren _clusterWatcher;
private final ConcurrentMap<String, WatchChildren> _onlineShardsNodesWatchers = new
ConcurrentHashMap<String, WatchChildren>();
private final ConcurrentMap<String, WatchChildren> _tableWatchers = new ConcurrentHashMap<String,
WatchChildren>();
private final Map<String, SafeModeCacheEntry> _clusterToSafeMode = new ConcurrentHashMap<String,
ZookeeperClusterStatus.SafeModeCacheEntry>();
private final ConcurrentMap<String, WatchNodeExistance> _enabledWatchNodeExistance
= new ConcurrentHashMap<String, WatchNodeExistance>();
- private final ConcurrentMap<String, WatchNodeExistance> _readOnlyWatchNodeExistance
= new ConcurrentHashMap<String, WatchNodeExistance>();
public ZookeeperClusterStatus(ZooKeeper zooKeeper, BlurConfiguration configuration) {
_zk = zooKeeper;
@@ -171,25 +169,16 @@ public class ZookeeperClusterStatus extends ClusterStatus {
Set<String> newSet = new HashSet<String>(tables);
Set<String> oldSet = _tablesPerCluster.put(cluster, newSet);
Set<String> newTables = getNewTables(newSet, oldSet);
- for (String table : newTables) {
+ Set<String> oldTables = getOldTables(newSet, oldSet);
+ for (String table : oldTables) {
final String clusterTableKey = getClusterTableKey(cluster, table);
-
- WatchNodeExistance readOnlyWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableReadOnlyPath(
- cluster, table));
- readOnlyWatcher.watch(new WatchNodeExistance.OnChange() {
- @Override
- public void action(Stat stat) {
- if (stat == null) {
- _readOnly.put(clusterTableKey, Boolean.FALSE);
- } else {
- _readOnly.put(clusterTableKey, Boolean.TRUE);
- }
- }
- });
- if (_readOnlyWatchNodeExistance.putIfAbsent(clusterTableKey, readOnlyWatcher) !=
null) {
- readOnlyWatcher.close();
+ WatchNodeExistance watchNodeExistance = _enabledWatchNodeExistance.remove(clusterTableKey);
+ if (watchNodeExistance != null) {
+ watchNodeExistance.close();
}
-
+ }
+ for (String table : newTables) {
+ final String clusterTableKey = getClusterTableKey(cluster, table);
WatchNodeExistance enabledWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableEnabledPath(
cluster, table));
enabledWatcher.watch(new WatchNodeExistance.OnChange() {
@@ -208,6 +197,17 @@ public class ZookeeperClusterStatus extends ClusterStatus {
}
}
+ private Set<String> getOldTables(Set<String> newSet, Set<String> oldSet)
{
+ Set<String> oldTables = new HashSet<String>();
+ if (oldSet != null) {
+ oldTables.addAll(oldSet);
+ }
+ if (newSet != null) {
+ oldTables.removeAll(newSet);
+ }
+ return oldTables;
+ }
+
private Set<String> getNewTables(Set<String> newSet, Set<String> oldSet)
{
Set<String> newTables = new HashSet<String>(newSet);
if (oldSet != null) {
@@ -383,7 +383,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
public TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table)
{
if (useCache) {
TableDescriptor tableDescriptor = _tableDescriptorCache.get(table);
- updateReadOnlyAndEnabled(useCache, tableDescriptor, cluster, table);
+ updateEnabled(useCache, tableDescriptor, cluster, table);
if (tableDescriptor != null) {
return tableDescriptor;
}
@@ -402,13 +402,14 @@ public class ZookeeperClusterStatus extends ClusterStatus {
tableDescriptor.blockCaching = isBlockCacheEnabled(cluster, table);
tableDescriptor.blockCachingFileTypes = getBlockCacheFileTypes(cluster, table);
tableDescriptor.name = table;
+ tableDescriptor.readOnly = internalGetReadOnly(ZookeeperPathConstants.getTableReadOnlyPath(cluster,
table));
tableDescriptor.preCacheCols = toList(getData(ZookeeperPathConstants
.getTableColumnsToPreCache(cluster, table)));
byte[] data = getData(ZookeeperPathConstants.getTableSimilarityPath(cluster, table));
if (data != null) {
tableDescriptor.similarityClass = new String(data);
}
- updateReadOnlyAndEnabled(useCache, tableDescriptor, cluster, table);
+ updateEnabled(useCache, tableDescriptor, cluster, table);
break LOOP;
} catch (NullPointerException e) {
npe = e;
@@ -432,6 +433,14 @@ public class ZookeeperClusterStatus extends ClusterStatus {
return tableDescriptor;
}
+ private boolean internalGetReadOnly(String tableReadOnlyPath) throws KeeperException, InterruptedException
{
+ Stat stat = _zk.exists(tableReadOnlyPath, false);
+ if (stat == null) {
+ return false;
+ }
+ return true;
+ }
+
private static List<String> toList(byte[] bs) {
if (bs == null) {
return null;
@@ -459,9 +468,8 @@ public class ZookeeperClusterStatus extends ClusterStatus {
return builder.toString().getBytes();
}
- private void updateReadOnlyAndEnabled(boolean useCache, TableDescriptor tableDescriptor,
String cluster, String table) {
+ private void updateEnabled(boolean useCache, TableDescriptor tableDescriptor, String cluster,
String table) {
if (tableDescriptor != null) {
- tableDescriptor.setReadOnly(isReadOnly(useCache, cluster, table));
tableDescriptor.setEnabled(isEnabled(useCache, cluster, table));
}
}
@@ -509,7 +517,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
close(_onlineShardsNodesWatchers);
close(_tableWatchers);
close(_enabledWatchNodeExistance);
- close(_readOnlyWatchNodeExistance);
}
}
@@ -675,28 +682,8 @@ public class ZookeeperClusterStatus extends ClusterStatus {
@Override
public boolean isReadOnly(boolean useCache, String cluster, String table) {
- if (useCache) {
- Boolean ro = _readOnly.get(getClusterTableKey(cluster, table));
- if (ro != null) {
- return ro;
- }
- }
- long s = System.nanoTime();
- String path = ZookeeperPathConstants.getTableReadOnlyPath(cluster, table);
- try {
- checkIfOpen();
- if (_zk.exists(path, false) == null) {
- return false;
- }
- return true;
- } catch (KeeperException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- long e = System.nanoTime();
- LOG.debug("trace isReadOnly took [" + (e - s) / 1000000.0 + " ms]");
- }
+ TableDescriptor tableDescriptor = getTableDescriptor(useCache, cluster, table);
+ return tableDescriptor.isReadOnly();
}
@Override
|