incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/4] git commit: This fixes BLUR-74.
Date Mon, 22 Apr 2013 02:16:34 GMT
This fixes BLUR-74.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7dd0ea55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7dd0ea55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7dd0ea55

Branch: refs/heads/0.1.5
Commit: 7dd0ea55044276e87a85a69988e69a471a343db5
Parents: 830def5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Apr 21 22:15:59 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Apr 21 22:15:59 2013 -0400

----------------------------------------------------------------------
 .../java/org/apache/blur/manager/IndexServer.java  |    3 +
 .../indexserver/DistributedIndexServer.java        |   43 +-
 .../blur/manager/indexserver/LocalIndexServer.java |    6 +
 .../manager/indexserver/ShardStateManager.java     |  186 +++
 .../apache/blur/manager/writer/BlurNRTIndex.java   |   22 +-
 .../apache/blur/thrift/BlurControllerServer.java   |  126 +-
 .../org/apache/blur/thrift/BlurShardServer.java    |   32 +-
 .../java/org/apache/blur/thrift/TableAdmin.java    |  116 +-
 .../apache/blur/thrift/ThriftBlurShardServer.java  |   11 +-
 .../org/apache/blur/thrift/BlurClusterTest.java    |   28 +-
 .../apache/blur/thrift/BlurShardServerTest.java    |    6 +
 .../org/apache/blur/thrift/DoNothingServer.java    |    8 +-
 .../blur/store/blockcache/BlockDirectory.java      |   30 +-
 .../org/apache/blur/store/hdfs/HdfsDirectory.java  |    2 +-
 .../org/apache/blur/thrift/generated/Blur.java     | 1280 +++++++++++++--
 .../apache/blur/thrift/generated/ShardState.java   |   79 +
 .../src/main/scripts/interface/Blur.thrift         |   37 +-
 .../src/main/scripts/interface/gen-html/Blur.html  |   36 +-
 .../src/main/scripts/interface/gen-html/index.html |    2 +
 .../org/apache/blur/thrift/generated/Blur.java     | 1280 +++++++++++++--
 .../apache/blur/thrift/generated/ShardState.java   |   79 +
 .../src/main/scripts/interface/gen-js/Blur.js      |  437 ++++--
 .../main/scripts/interface/gen-js/Blur_types.js    |    8 +
 .../main/scripts/interface/gen-perl/Blur/Blur.pm   |  409 ++++-
 .../main/scripts/interface/gen-perl/Blur/Types.pm  |    7 +
 .../src/main/scripts/interface/gen-rb/blur.rb      |   61 +
 .../main/scripts/interface/gen-rb/blur_types.rb    |   11 +
 27 files changed, 3809 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
index 1097048..62d80b9 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
@@ -23,6 +23,7 @@ import java.util.SortedSet;
 
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.lucene.search.similarities.Similarity;
 
@@ -174,4 +175,6 @@ public interface IndexServer {
    */
   void close();
 
+  Map<String, ShardState> getShardState(String table);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 48c9a9a..8e13047 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -65,6 +65,7 @@ import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.Cache;
 import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -103,6 +104,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private Map<String, DistributedLayoutManager> _layoutManagers = new ConcurrentHashMap<String, DistributedLayoutManager>();
   private Map<String, Set<String>> _layoutCache = new ConcurrentHashMap<String, Set<String>>();
   private ConcurrentHashMap<String, Map<String, BlurIndex>> _indexes = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
+  private final ShardStateManager _shardStateManager = new ShardStateManager();
 
   // set externally
   private ClusterStatus _clusterStatus;
@@ -330,6 +332,11 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }
   }
 
+  @Override
+  public Map<String, ShardState> getShardState(String table) {
+    return _shardStateManager.getShardState(table);
+  }
+
   private void setupFlushCacheTimer() {
     _timerCacheFlush = new Timer("Flush-IndexServer-Caches", true);
     _timerCacheFlush.schedule(new TimerTask() {
@@ -386,9 +393,12 @@ public class DistributedIndexServer extends AbstractIndexServer {
     LOG.info("Closing index [{0}] from table [{1}] shard [{2}]", index, table, shard);
     try {
       _filterCache.closing(table, shard, index);
+      _shardStateManager.closing(table, shard);
       index.close();
+      _shardStateManager.closed(table, shard);
     } catch (Throwable e) {
       LOG.error("Error while closing index [{0}] from table [{1}] shard [{2}]", e, index, table, shard);
+      _shardStateManager.closingError(table, shard);
     }
   }
 
@@ -411,6 +421,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   @Override
   public void close() {
     if (_running.get()) {
+      _shardStateManager.close();
       _running.set(false);
       closeAllIndexes();
       _watchOnlineShards.close();
@@ -497,7 +508,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private BlurIndex openShard(String table, String shard) throws IOException {
     LOG.info("Opening shard [{0}] for table [{1}]", shard, table);
     Path tablePath = new Path(getTableDescriptor(table).tableUri);
-    Path walTablePath = new Path(tablePath, LOGS);
     Path hdfsDirPath = new Path(tablePath, shard);
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName, BlurConstants.getPid());
@@ -506,22 +516,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
     directory.setLockFactory(lockFactory);
 
     TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
-    String compressionClass = descriptor.compressionClass;
-    int compressionBlockSize = descriptor.compressionBlockSize;
-    if (compressionClass != null) {
-      // throw new RuntimeException("Not supported yet");
-      LOG.error("Not supported yet");
-      // CompressionCodec compressionCodec;
-      // try {
-      // compressionCodec = BlurUtil.getInstance(compressionClass,
-      // CompressionCodec.class);
-      // directory = new CompressedFieldDataDirectory(directory,
-      // compressionCodec, compressionBlockSize);
-      // } catch (Exception e) {
-      // throw new IOException(e);
-      // }
-    }
-
     TableContext tableContext = TableContext.create(descriptor);
     ShardContext shardContext = ShardContext.create(tableContext, shard);
 
@@ -547,7 +541,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
       // reader.setSimilarity(getSimilarity(table));
       // reader.init();
       // index = reader;
-      throw new RuntimeException("not impl");
+      throw new RuntimeException("not implemented yet");
     } else {
       BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, _closer, dir, _gc, _searchExecutor);
       index = writer;
@@ -600,7 +594,18 @@ public class DistributedIndexServer extends AbstractIndexServer {
         Future<BlurIndex> submit = _openerService.submit(new Callable<BlurIndex>() {
           @Override
           public BlurIndex call() throws Exception {
-            return openShard(table, shard);
+            _shardStateManager.opening(table, shard);
+            try {
+              BlurIndex openShard = openShard(table, shard);
+              _shardStateManager.open(table, shard);
+              return openShard;
+            } catch (Exception e) {
+              _shardStateManager.openingError(table, shard);
+              throw e;
+            } catch (Throwable t) {
+              _shardStateManager.openingError(table, shard);
+              throw new RuntimeException(t);
+            }
           }
         });
         opening.put(shard, submit);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
index 3f17bdb..94270e5 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -44,6 +44,7 @@ import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -244,4 +245,9 @@ public class LocalIndexServer extends AbstractIndexServer {
   public int getCompressionBlockSize(String table) {
     throw new RuntimeException("Should not be used.");
   }
+
+  @Override
+  public Map<String, ShardState> getShardState(String table) {
+    throw new RuntimeException("Not supported yet.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java
new file mode 100644
index 0000000..b507736
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java
@@ -0,0 +1,186 @@
+package org.apache.blur.manager.indexserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.ShardState;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * This class holds the current state of any given shard within the shard
+ * server.
+ * 
+ */
+public class ShardStateManager implements Closeable {
+
+  private static final long _5_SECONDS = TimeUnit.SECONDS.toMillis(5);
+  private static final long _60_SECONDS = TimeUnit.SECONDS.toMillis(60);
+  private final Map<Key, Value> stateMap = new ConcurrentHashMap<Key, Value>();
+  private final Timer timer;
+
+  public ShardStateManager() {
+    timer = new Timer("ShardStateManager-cleanup", true);
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        Collection<Key> toBeDeleted = null;
+        for (Entry<Key, Value> e : stateMap.entrySet()) {
+          if (shouldBeRemoved(e)) {
+            if (toBeDeleted == null) {
+              toBeDeleted = new HashSet<ShardStateManager.Key>();
+            }
+            toBeDeleted.add(e.getKey());
+          }
+        }
+        if (toBeDeleted != null) {
+          for (Key k : toBeDeleted) {
+            stateMap.remove(k);
+          }
+        }
+      }
+
+      private boolean shouldBeRemoved(Entry<Key, Value> e) {
+        if (e.getValue().timeToBeRemoved < System.currentTimeMillis()) {
+          return true;
+        }
+        return false;
+      }
+    }, _5_SECONDS, _5_SECONDS);
+  }
+
+  public void opening(String table, String shard) {
+    setState(table, shard, ShardState.OPENING);
+  }
+
+  public void open(String table, String shard) {
+    setState(table, shard, ShardState.OPEN);
+  }
+
+  public void openingError(String table, String shard) {
+    setState(table, shard, ShardState.OPENING_ERROR);
+  }
+
+  public void closing(String table, String shard) {
+    setState(table, shard, ShardState.CLOSING);
+  }
+
+  public void closed(String table, String shard) {
+    setState(table, shard, ShardState.CLOSED);
+  }
+
+  public void closingError(String table, String shard) {
+    setState(table, shard, ShardState.CLOSING_ERROR);
+  }
+
+  public Map<String, ShardState> getShardState(String table) {
+    Map<String, ShardState> result = new HashMap<String, ShardState>();
+    List<Entry<Key, Value>> entryList = new ArrayList<Entry<Key, Value>>(stateMap.entrySet());
+    for (Entry<Key, Value> entry : entryList) {
+      Key key = entry.getKey();
+      if (key.table.equals(table)) {
+        result.put(key.shard, entry.getValue().shardState);
+      }
+    }
+    return result;
+  }
+
+  private void setState(String table, String shard, ShardState state) {
+    switch (state) {
+    case CLOSED:
+    case CLOSING_ERROR:
+      stateMap.put(new Key(table, shard), new Value(state, System.currentTimeMillis() + _60_SECONDS));
+      return;
+    default:
+      stateMap.put(new Key(table, shard), new Value(state));
+      return;
+    }
+  }
+
+  private static class Value {
+    final ShardState shardState;
+    final long timeToBeRemoved;
+
+    Value(ShardState shardState) {
+      this(shardState, Long.MAX_VALUE);
+    }
+
+    Value(ShardState shardState, long timeToBeRemoved) {
+      this.shardState = shardState;
+      this.timeToBeRemoved = timeToBeRemoved;
+    }
+  }
+
+  private static class Key {
+    final String table;
+    final String shard;
+
+    Key(String table, String shard) {
+      this.table = table;
+      this.shard = shard;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((shard == null) ? 0 : shard.hashCode());
+      result = prime * result + ((table == null) ? 0 : table.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      Key other = (Key) obj;
+      if (shard == null) {
+        if (other.shard != null)
+          return false;
+      } else if (!shard.equals(other.shard))
+        return false;
+      if (table == null) {
+        if (other.table != null)
+          return false;
+      } else if (!table.equals(other.table))
+        return false;
+      return true;
+    }
+  }
+
+  @Override
+  public void close() {
+    timer.cancel();
+    timer.purge();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 511a6a1..6dbe939 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -63,7 +63,7 @@ public class BlurNRTIndex extends BlurIndex {
   private final ShardContext _shardContext;
   private final TransactionRecorder _recorder;
   private final TrackingIndexWriter _trackingWriter;
-  
+
   private long _lastRefresh = 0;
 
   public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler mergeScheduler, IndexInputCloser closer,
@@ -144,15 +144,17 @@ public class BlurNRTIndex extends BlurIndex {
   @Override
   public void close() throws IOException {
     // @TODO make sure that locks are cleaned up.
-    _isClosed.set(true);
-    _committer.interrupt();
-    _refresher.close();
-    try {
-      _recorder.close();
-      _writer.close();
-      getNRTManager().close();
-    } finally {
-      _directory.close();
+    if (!_isClosed.get()) {
+      _isClosed.set(true);
+      _committer.interrupt();
+      _refresher.close();
+      try {
+        _recorder.close();
+        _writer.close();
+        getNRTManager().close();
+      } finally {
+        _directory.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 2ce0fe5..4e44bd9 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -50,9 +50,9 @@ import org.apache.blur.manager.results.MergerBlurResultIterable;
 import org.apache.blur.manager.stats.MergerTableStats;
 import org.apache.blur.manager.status.MergerQueryStatus;
 import org.apache.blur.manager.status.MergerQueryStatusSingle;
-import org.apache.blur.thrift.BException;
-import org.apache.blur.thrift.BlurClientManager;
 import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurQueryStatus;
@@ -61,22 +61,21 @@ import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Blur.Client;
-import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurExecutorCompletionService;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.ForkJoin;
+import org.apache.blur.utils.ForkJoin.Merger;
+import org.apache.blur.utils.ForkJoin.ParallelCall;
 import org.apache.blur.utils.QueryCache;
 import org.apache.blur.utils.QueryCacheEntry;
 import org.apache.blur.utils.QueryCacheKey;
-import org.apache.blur.utils.ForkJoin.Merger;
-import org.apache.blur.utils.ForkJoin.ParallelCall;
 import org.apache.blur.zookeeper.WatchChildren;
-import org.apache.blur.zookeeper.WatchNodeExistance;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
+import org.apache.blur.zookeeper.WatchNodeExistance;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
@@ -84,16 +83,17 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
-
 public class BlurControllerServer extends TableAdmin implements Iface {
 
   public static abstract class BlurClient {
-    public abstract <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws Exception;
+    public abstract <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime,
+        long maxBackOffTime) throws Exception;
   }
 
   public static class BlurClientRemote extends BlurClient {
     @Override
-    public <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws Exception {
+    public <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime)
+        throws Exception {
       return BlurClientManager.execute(node, command, maxRetries, backOffTime, maxBackOffTime);
     }
   }
@@ -102,7 +102,8 @@ public class BlurControllerServer extends TableAdmin implements Iface {
   private static final Log LOG = LogFactory.getLog(BlurControllerServer.class);
 
   private ExecutorService _executor;
-  private AtomicReference<Map<String, Map<String, String>>> _shardServerLayout = new AtomicReference<Map<String, Map<String, String>>>(new HashMap<String, Map<String, String>>());
+  private AtomicReference<Map<String, Map<String, String>>> _shardServerLayout = new AtomicReference<Map<String, Map<String, String>>>(
+      new HashMap<String, Map<String, String>>());
   private BlurClient _client;
   private int _threadCount = 64;
   private AtomicBoolean _closed = new AtomicBoolean();
@@ -255,7 +256,8 @@ public class BlurControllerServer extends TableAdmin implements Iface {
     try {
       String onlineControllerPath = ZookeeperPathConstants.getOnlineControllersPath() + "/" + _nodeName;
       while (_zookeeper.exists(onlineControllerPath, false) != null) {
-        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", _nodeName, onlineControllerPath);
+        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", _nodeName,
+            onlineControllerPath);
         Thread.sleep(3000);
       }
       String version = BlurUtil.getVersion();
@@ -330,7 +332,8 @@ public class BlurControllerServer extends TableAdmin implements Iface {
             return new BlurResultIterableClient(client, table, blurQuery, facetCounts, _remoteFetchCount);
           }
         }, new MergerBlurResultIterable(blurQuery));
-        BlurResults results = BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts, _executor, selector, this, table);
+        BlurResults results = BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts, _executor, selector, this,
+            table);
         if (!validResults(results, shardCount, blurQuery)) {
           BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
           continue OUTER;
@@ -369,8 +372,10 @@ public class BlurControllerServer extends TableAdmin implements Iface {
         }
       }, _maxFetchRetries, _fetchDelay, _maxFetchDelay);
     } catch (Exception e) {
-      LOG.error("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector, clientHostnamePort);
-      throw new BException("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector, clientHostnamePort);
+      LOG.error("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector,
+          clientHostnamePort);
+      throw new BException("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table,
+          selector, clientHostnamePort);
     }
   }
 
@@ -478,7 +483,49 @@ public class BlurControllerServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value) throws BlurException, TException {
+  public Map<String, Map<String, ShardState>> shardServerLayoutState(final String table) throws BlurException,
+      TException {
+    try {
+      return scatterGather(getCluster(table), new BlurCommand<Map<String, Map<String, ShardState>>>() {
+        @Override
+        public Map<String, Map<String, ShardState>> call(Client client) throws BlurException, TException {
+          try {
+            return client.shardServerLayoutState(table);
+          } catch (BlurException e) {
+            LOG.error("UNKOWN error from shard server", e);
+            throw e;
+          }
+        }
+      }, new Merger<Map<String, Map<String, ShardState>>>() {
+        @Override
+        public Map<String, Map<String, ShardState>> merge(
+            BlurExecutorCompletionService<Map<String, Map<String, ShardState>>> service) throws BlurException {
+          Map<String, Map<String, ShardState>> result = new HashMap<String, Map<String, ShardState>>();
+          while (service.getRemainingCount() > 0) {
+            Future<Map<String, Map<String, ShardState>>> future = service.poll(_defaultParallelCallTimeout,
+                TimeUnit.MILLISECONDS, true, table);
+            Map<String, Map<String, ShardState>> shardResult = service.getResultThrowException(future, table);
+            for (Entry<String, Map<String, ShardState>> entry : shardResult.entrySet()) {
+              Map<String, ShardState> map = result.get(entry.getKey());
+              if (map == null) {
+                map = new HashMap<String, ShardState>();
+                result.put(entry.getKey(), map);
+              }
+              map.putAll(entry.getValue());
+            }
+          }
+          return result;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get shard server layout [{0}]", e, table);
+      throw new BException("Unknown error while trying to get shard server layout [{0}]", e, table);
+    }
+  }
+
+  @Override
+  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value)
+      throws BlurException, TException {
     checkTable(table);
     try {
       return scatterGather(getCluster(table), new BlurCommand<Long>() {
@@ -492,15 +539,18 @@ public class BlurControllerServer extends TableAdmin implements Iface {
         public Long merge(BlurExecutorCompletionService<Long> service) throws BlurException {
           Long total = 0L;
           while (service.getRemainingCount() > 0) {
-            Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, value);
+            Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table,
+                columnFamily, columnName, value);
             total += service.getResultThrowException(future, table, columnFamily, columnName, value);
           }
           return total;
         }
       });
     } catch (Exception e) {
-      LOG.error("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily, columnName, value);
-      throw new BException("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily, columnName, value);
+      LOG.error("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily,
+          columnName, value);
+      throw new BException("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table,
+          columnFamily, columnName, value);
     }
   }
 
@@ -536,7 +586,8 @@ public class BlurControllerServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public List<String> terms(final String table, final String columnFamily, final String columnName, final String startWith, final short size) throws BlurException, TException {
+  public List<String> terms(final String table, final String columnFamily, final String columnName,
+      final String startWith, final short size) throws BlurException, TException {
     checkTable(table);
     try {
       return scatterGather(getCluster(table), new BlurCommand<List<String>>() {
@@ -549,17 +600,20 @@ public class BlurControllerServer extends TableAdmin implements Iface {
         public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
           TreeSet<String> terms = new TreeSet<String>();
           while (service.getRemainingCount() > 0) {
-            Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, startWith, size);
+            Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table,
+                columnFamily, columnName, startWith, size);
             terms.addAll(service.getResultThrowException(future, table, columnFamily, columnName, startWith, size));
           }
           return new ArrayList<String>(terms).subList(0, Math.min(terms.size(), size));
         }
       });
     } catch (Exception e) {
-      LOG.error("Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]", e, table, columnFamily, columnName, startWith,
-          size);
-      throw new BException("Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]", e, table, columnFamily, columnName,
-          startWith, size);
+      LOG.error(
+          "Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]",
+          e, table, columnFamily, columnName, startWith, size);
+      throw new BException(
+          "Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]",
+          e, table, columnFamily, columnName, startWith, size);
     }
   }
 
@@ -579,13 +633,15 @@ public class BlurControllerServer extends TableAdmin implements Iface {
   }
 
   private <R> R scatterGather(String cluster, final BlurCommand<R> command, Merger<R> merger) throws Exception {
-    return ForkJoin.execute(_executor, _clusterStatus.getOnlineShardServers(true, cluster), new ParallelCall<String, R>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public R call(String hostnamePort) throws Exception {
-        return _client.execute(hostnamePort, (BlurCommand<R>) command.clone(), _maxDefaultRetries, _defaultDelay, _maxDefaultDelay);
-      }
-    }).merge(merger);
+    return ForkJoin.execute(_executor, _clusterStatus.getOnlineShardServers(true, cluster),
+        new ParallelCall<String, R>() {
+          @SuppressWarnings("unchecked")
+          @Override
+          public R call(String hostnamePort) throws Exception {
+            return _client.execute(hostnamePort, (BlurCommand<R>) command.clone(), _maxDefaultRetries, _defaultDelay,
+                _maxDefaultDelay);
+          }
+        }).merge(merger);
   }
 
   private <R> void scatter(String cluster, BlurCommand<R> command) throws Exception {
@@ -803,8 +859,10 @@ public class BlurControllerServer extends TableAdmin implements Iface {
         }
       });
     } catch (Exception e) {
-      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
-      throw new BException("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
+      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table,
+          numberOfSegmentsPerShard);
+      throw new BException("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table,
+          numberOfSegmentsPerShard);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 3622387..62e6ac3 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -21,8 +21,10 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIV
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +39,7 @@ import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurQueryStatus;
@@ -46,8 +48,8 @@ import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.QueryCache;
@@ -201,6 +203,31 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
+  public Map<String, Map<String, ShardState>> shardServerLayoutState(String table) throws BlurException, TException {
+    try {
+      Map<String, Map<String, ShardState>> result = new TreeMap<String, Map<String, ShardState>>();
+      String nodeName = _indexServer.getNodeName();
+      Map<String, ShardState> stateMap = _indexServer.getShardState(table);
+      for (Entry<String, ShardState> entry : stateMap.entrySet()) {
+        result.put(entry.getKey(), newMap(nodeName, entry.getValue()));
+      }
+      return result;
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to getting shardServerLayoutState for table [" + table + "]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private Map<String, ShardState> newMap(String nodeName, ShardState state) {
+    Map<String, ShardState> map = new HashMap<String, ShardState>();
+    map.put(nodeName, state);
+    return map;
+  }
+
+  @Override
   public long recordFrequency(String table, String columnFamily, String columnName, String value) throws BlurException,
       TException {
     checkTable(_cluster, table);
@@ -343,4 +370,5 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public void setConfiguration(BlurConfiguration conf) {
     _configuration = conf;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index 52a4573..fe344de 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -18,21 +18,21 @@ package org.apache.blur.thrift;
  */
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.ZooKeeper;
 
-
 public abstract class TableAdmin implements Iface {
 
   private static final Log LOG = LogFactory.getLog(TableAdmin.class);
@@ -67,7 +67,8 @@ public abstract class TableAdmin implements Iface {
       }
       _clusterStatus.createTable(tableDescriptor);
     } catch (Exception e) {
-      LOG.error("Unknown error during create of [table={0}, tableDescriptor={1}]", e, tableDescriptor.name, tableDescriptor);
+      LOG.error("Unknown error during create of [table={0}, tableDescriptor={1}]", e, tableDescriptor.name,
+          tableDescriptor);
       throw new BException(e.getMessage(), e);
     }
     if (tableDescriptor.isEnabled) {
@@ -92,25 +93,6 @@ public abstract class TableAdmin implements Iface {
     }
   }
 
-  private void waitForTheTableToDisengage(String cluster, String table) throws BlurException, TException {
-    // LOG.info("Waiting for shards to disengage on table [" + table + "]");
-  }
-
-  private void waitForTheTableToDisable(String cluster, String table) throws BlurException, TException {
-    LOG.info("Waiting for shards to disable on table [" + table + "]");
-    while (true) {
-      if (!_clusterStatus.isEnabled(false, cluster, table)) {
-        return;
-      }
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        LOG.error("Unknown error while enabling table [" + table + "]", e);
-        throw new BException("Unknown error while enabling table [" + table + "]", e);
-      }
-    }
-  }
-
   @Override
   public final void enableTable(String table) throws BlurException, TException {
     try {
@@ -143,6 +125,10 @@ public abstract class TableAdmin implements Iface {
     }
   }
 
+  /**
+   * This method only works on controllers, if called on shard servers it will
+   * only wait itself to finish not the whole cluster.
+   */
   private void waitForTheTableToEngage(String cluster, String table) throws BlurException, TException {
     TableDescriptor describe = describe(table);
     int shardCount = describe.shardCount;
@@ -155,9 +141,70 @@ public abstract class TableAdmin implements Iface {
         throw new BException("Unknown error while engaging table [" + table + "]", e);
       }
       try {
-        Map<String, String> shardServerLayout = shardServerLayout(table);
-        LOG.info("Shards [" + shardServerLayout.size() + "/" + shardCount + "] of table [" + table + "] engaged");
-        if (shardServerLayout.size() == shardCount) {
+        Map<String, Map<String, ShardState>> shardServerLayoutState = shardServerLayoutState(table);
+
+        int countNumberOfOpen = 0;
+        int countNumberOfOpening = 0;
+        for (Entry<String, Map<String, ShardState>> shardEntry : shardServerLayoutState.entrySet()) {
+          Map<String, ShardState> value = shardEntry.getValue();
+          for (ShardState state : value.values()) {
+            if (state == ShardState.OPEN) {
+              countNumberOfOpen++;
+            } else if (state == ShardState.OPENING) {
+              countNumberOfOpening++;
+            } else {
+              LOG.warn("Unexpected state of [{0}] for shard [{1}].", state, shardEntry.getKey());
+            }
+          }
+        }
+        LOG.info("Opening - Shards Open [{0}], Shards Opening [{1}] of table [{2}]", countNumberOfOpen,
+            countNumberOfOpening, table);
+        if (countNumberOfOpen == shardCount && countNumberOfOpening == 0) {
+          return;
+        }
+      } catch (BlurException e) {
+        LOG.info("Stilling waiting", e);
+      } catch (TException e) {
+        LOG.info("Stilling waiting", e);
+      }
+    }
+  }
+
+  /**
+   * This method only works on controllers, if called on shard servers it will
+   * only wait itself to finish not the whole cluster.
+   */
+  private void waitForTheTableToDisengage(String cluster, String table) throws BlurException, TException {
+    LOG.info("Waiting for shards to disengage on table [" + table + "]");
+    while (true) {
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while disengaging table [" + table + "]", e);
+        throw new BException("Unknown error while disengaging table [" + table + "]", e);
+      }
+      try {
+        Map<String, Map<String, ShardState>> shardServerLayoutState = shardServerLayoutState(table);
+
+        int countNumberOfOpen = 0;
+        int countNumberOfClosing = 0;
+        for (Entry<String, Map<String, ShardState>> shardEntry : shardServerLayoutState.entrySet()) {
+          Map<String, ShardState> value = shardEntry.getValue();
+          for (ShardState state : value.values()) {
+            if (state == ShardState.OPEN) {
+              countNumberOfOpen++;
+            } else if (state == ShardState.CLOSING) {
+              countNumberOfClosing++;
+            } else if (state == ShardState.CLOSED) {
+              LOG.info("Shard [{0}] of table [{1}] now reporting closed.", shardEntry.getKey(), table);
+            } else {
+              LOG.warn("Unexpected state of [{0}] for shard [{1}].", state, shardEntry.getKey());
+            }
+          }
+        }
+        LOG.info("Closing - Shards Open [{0}], Shards Closing [{1}] of table [{2}]", countNumberOfOpen,
+            countNumberOfClosing, table);
+        if (countNumberOfOpen == 0 && countNumberOfClosing == 0) {
           return;
         }
       } catch (BlurException e) {
@@ -168,6 +215,21 @@ public abstract class TableAdmin implements Iface {
     }
   }
 
+  private void waitForTheTableToDisable(String cluster, String table) throws BlurException, TException {
+    LOG.info("Waiting for shards to disable on table [" + table + "]");
+    while (true) {
+      if (!_clusterStatus.isEnabled(false, cluster, table)) {
+        return;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error("Unknown error while enabling table [" + table + "]", e);
+        throw new BException("Unknown error while enabling table [" + table + "]", e);
+      }
+    }
+  }
+
   @Override
   public final void removeTable(String table, boolean deleteIndexFiles) throws BlurException, TException {
     try {
@@ -313,7 +375,7 @@ public abstract class TableAdmin implements Iface {
   public void setZookeeper(ZooKeeper zookeeper) {
     _zookeeper = zookeeper;
   }
-  
+
   public void setConfiguration(BlurConfiguration config) {
     _configuration = config;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index a5abbc1..cc4c923 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -211,10 +211,13 @@ public class ThriftBlurShardServer extends ThriftServer {
     shardServer.init();
 
     Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(shardServer, Iface.class);
-    WebAppContext context = httpServer.getContext();
-    context.addServlet(new ServletHolder(new TServlet(new Blur.Processor<Blur.Iface>(iface), new TJSONProtocol.Factory())), "/blur");
-    context.addServlet(new ServletHolder(new JSONReporterServlet()), "/livemetrics");
-    JSONReporter.enable("json-reporter", 1, TimeUnit.SECONDS, 60);
+    if (httpServer != null) {
+      WebAppContext context = httpServer.getContext();
+      context.addServlet(new ServletHolder(new TServlet(new Blur.Processor<Blur.Iface>(iface),
+          new TJSONProtocol.Factory())), "/blur");
+      context.addServlet(new ServletHolder(new JSONReporterServlet()), "/livemetrics");
+      JSONReporter.enable("json-reporter", 1, TimeUnit.SECONDS, 60);
+    }
 
     int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 9146a80..39a44b5 100644
--- a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -18,6 +18,7 @@ package org.apache.blur.thrift;
  */
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.IOException;
@@ -79,6 +80,10 @@ public class BlurClusterTest {
   public static void shutdownCluster() {
     MiniCluster.shutdownBlurCluster();
   }
+  
+  private Iface getClient() {
+    return BlurClient.getClient(MiniCluster.getControllerConnectionStr());
+  }
 
   @Test
   public void testCreateTable() throws BlurException, TException, IOException {
@@ -92,10 +97,6 @@ public class BlurClusterTest {
     assertEquals(Arrays.asList("test"), tableList);
   }
 
-  private Iface getClient() {
-    return BlurClient.getClient(MiniCluster.getControllerConnectionStr());
-  }
-
   @Test
   public void testLoadTable() throws BlurException, TException, InterruptedException {
     Iface client = getClient();
@@ -119,4 +120,23 @@ public class BlurClusterTest {
     BlurResults results = client.query("test", blurQuery);
     assertEquals(length, results.getTotalResults());
   }
+
+  @Test
+  public void testCreateDisableAndRemoveTable() throws IOException, BlurException, TException {
+    Iface client = getClient();
+    String tableName = UUID.randomUUID().toString();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName(tableName);
+    tableDescriptor.setShardCount(5);
+    tableDescriptor.setTableUri(MiniCluster.getFileSystemUri().toString() + "/blur/" + tableName);
+
+    for (int i = 0; i < 3; i++) {
+      client.createTable(tableDescriptor);
+      client.disableTable(tableName);
+      client.removeTable(tableName, true);
+    }
+
+    assertFalse(client.tableList().contains(tableName));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/test/java/org/apache/blur/thrift/BlurShardServerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurShardServerTest.java b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurShardServerTest.java
index 7f78ba4..72e9a33 100644
--- a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurShardServerTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurShardServerTest.java
@@ -28,6 +28,7 @@ import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.lucene.search.similarities.Similarity;
 import org.junit.After;
@@ -139,6 +140,11 @@ public class BlurShardServerTest {
       public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
         throw new RuntimeException("no impl");
       }
+
+      @Override
+      public Map<String, ShardState> getShardState(String table) {
+        throw new RuntimeException("not impl");
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-core/src/test/java/org/apache/blur/thrift/DoNothingServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/thrift/DoNothingServer.java b/src/blur-core/src/test/java/org/apache/blur/thrift/DoNothingServer.java
index 56e3156..cc395d0 100644
--- a/src/blur-core/src/test/java/org/apache/blur/thrift/DoNothingServer.java
+++ b/src/blur-core/src/test/java/org/apache/blur/thrift/DoNothingServer.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurQueryStatus;
@@ -29,9 +30,9 @@ import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.thrift.TException;
 
 
@@ -171,4 +172,9 @@ public class DoNothingServer implements Iface {
   public Map<String, String> configuration() throws BlurException, TException {
     return null;
   }
+
+  @Override
+  public Map<String, Map<String, ShardState>> shardServerLayoutState(String table) throws BlurException, TException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
index 8146830..dfdba07 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
@@ -82,6 +82,7 @@ public class BlockDirectory extends Directory {
   private String _dirName;
   private Cache _cache;
   private Set<String> _blockCacheFileTypes;
+  private boolean closed = false;
 
   public BlockDirectory(String dirName, Directory directory) throws IOException {
     this(dirName, directory, NO_CACHE);
@@ -91,7 +92,8 @@ public class BlockDirectory extends Directory {
     this(dirName, directory, cache, null);
   }
 
-  public BlockDirectory(String dirName, Directory directory, Cache cache, Set<String> blockCacheFileTypes) throws IOException {
+  public BlockDirectory(String dirName, Directory directory, Cache cache, Set<String> blockCacheFileTypes)
+      throws IOException {
     _dirName = dirName;
     _directory = directory;
     _blockSize = BLOCK_SIZE;
@@ -130,7 +132,8 @@ public class BlockDirectory extends Directory {
     private String _cacheName;
     private Cache _cache;
 
-    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache, IOContext context) {
+    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache,
+        IOContext context) {
       super(name, context);
       _source = source;
       _blockSize = blockSize;
@@ -179,7 +182,8 @@ public class BlockDirectory extends Directory {
       return lengthToReadInBlock;
     }
 
-    private void readIntoCacheAndResult(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) throws IOException {
+    private void readIntoCacheAndResult(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock)
+        throws IOException {
       long position = getRealPosition(blockId, 0);
       int length = (int) Math.min(_blockSize, _fileLength - position);
       _source.seek(position);
@@ -203,11 +207,14 @@ public class BlockDirectory extends Directory {
 
   @Override
   public void close() throws IOException {
-    String[] files = listAll();
-    for (String file : files) {
-      _cache.delete(getFileCacheName(file));
+    if (!closed) {
+      String[] files = listAll();
+      for (String file : files) {
+        _cache.delete(getFileCacheName(file));
+      }
+      _directory.close();
+      closed = true;
     }
-    _directory.close();
   }
 
   String getFileCacheLocation(String name) {
@@ -221,7 +228,7 @@ public class BlockDirectory extends Directory {
   private long getFileModified(String name) throws IOException {
     if (_directory instanceof FSDirectory) {
       File directory = ((FSDirectory) _directory).getDirectory();
-      File file = new File(directory,name);
+      File file = new File(directory, name);
       if (!file.exists()) {
         throw new FileNotFoundException("File [" + name + "] not found");
       }
@@ -274,9 +281,10 @@ public class BlockDirectory extends Directory {
   @Override
   public IndexOutput createOutput(String name, IOContext context) throws IOException {
     IndexOutput dest = _directory.createOutput(name, context);
-//    if (_blockCacheFileTypes == null || isCachableFile(name)) {
-//      return new CachedIndexOutput(this, dest, _blockSize, name, _cache, _blockSize);
-//    }
+    // if (_blockCacheFileTypes == null || isCachableFile(name)) {
+    // return new CachedIndexOutput(this, dest, _blockSize, name, _cache,
+    // _blockSize);
+    // }
     return dest;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7dd0ea55/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 0e92867..2e5fa98 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -287,7 +287,7 @@ public class HdfsDirectory extends Directory {
 
   @Override
   public void close() throws IOException {
-    fileSystem.close();
+
   }
 
   private Path getPath(String name) {


Mime
View raw message