incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-399.
Date Tue, 30 Dec 2014 01:03:18 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master c60ed1167 -> e9724d30a


Fixed BLUR-399.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e9724d30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e9724d30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e9724d30

Branch: refs/heads/master
Commit: e9724d30a039c4aee2d41ef4b13f24ab3c848ddb
Parents: c60ed11
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 29 20:03:08 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 29 20:03:08 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/console/RunMiniCluster.java |   4 +-
 .../clusterstatus/ZookeeperClusterStatus.java   |   3 +
 .../org/apache/blur/ExternalThriftServer.java   |  95 +++++
 .../test/java/org/apache/blur/MiniCluster.java  | 377 ++++++++++++++-----
 .../java/org/apache/blur/MiniClusterServer.java |  29 ++
 .../apache/blur/command/RunSlowForTesting.java  |  49 +++
 .../org/apache/blur/thrift/BlurClusterTest.java | 150 +++++++-
 .../org/apache/blur/thrift/SuiteCluster.java    |   2 +-
 .../services/org.apache.blur.command.Commands   |   3 +-
 .../lib/BlurOutputFormatMiniClusterTest.java    |   6 +-
 .../lib/BlurOutputFormatMiniClusterTest.java    |  12 +-
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  |  24 +-
 .../java/org/apache/blur/BlurConfiguration.java |  46 ++-
 13 files changed, 673 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-console/src/test/java/org/apache/blur/console/RunMiniCluster.java
----------------------------------------------------------------------
diff --git a/blur-console/src/test/java/org/apache/blur/console/RunMiniCluster.java b/blur-console/src/test/java/org/apache/blur/console/RunMiniCluster.java
index 0e62097..ecc781b 100644
--- a/blur-console/src/test/java/org/apache/blur/console/RunMiniCluster.java
+++ b/blur-console/src/test/java/org/apache/blur/console/RunMiniCluster.java
@@ -32,7 +32,7 @@ public class RunMiniCluster {
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/mini-cluster"));
 
   public static void main(String[] args) throws IOException {
-    //GCWatcher.init(0.60);
+    // GCWatcher.init(0.60);
     LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
     File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
     testDirectory.mkdirs();
@@ -51,7 +51,7 @@ public class RunMiniCluster {
     System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
     testDirectory.delete();
     MiniCluster miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
 
     System.out.println("ZK Connection String = [" + miniCluster.getZkConnectionString() + "]");
     System.out.println("Controller Connection String = [" + miniCluster.getControllerConnectionStr() + "]");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index e97e50b..469d92d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -36,6 +36,7 @@ import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thirdparty.thrift_0_9_0.TDeserializer;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.TSerializer;
@@ -158,6 +159,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
           watch.close();
         }
         _tableDescriptorCache.remove(table);
+        TableContext.clear(table);
       }
       for (final String table : newTables) {
         final String clusterTableKey = getClusterTableKey(_cluster, table);
@@ -167,6 +169,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
           public void action(byte[] data) {
             runActions();
             _tableDescriptorCache.remove(table);
+            TableContext.clear(table);
           }
         });
         if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/java/org/apache/blur/ExternalThriftServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/ExternalThriftServer.java b/blur-core/src/test/java/org/apache/blur/ExternalThriftServer.java
new file mode 100644
index 0000000..d8547d3
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/ExternalThriftServer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.ThriftBlurControllerServer;
+import org.apache.blur.thrift.ThriftBlurShardServer;
+import org.apache.blur.thrift.ThriftServer;
+
+public class ExternalThriftServer {
+
+  public static void main(String[] args) throws Exception {
+
+    // Properties properties = System.getProperties();
+    // Set<Object> keySet = properties.keySet();
+    // Map<String, String> map = new TreeMap<String, String>();
+    // for (Object o : keySet) {
+    // Object object = properties.get(o);
+    // map.put(o.toString(), object.toString());
+    // }
+    // for (Entry<String, String> e : map.entrySet()) {
+    // System.out.println(e.getKey() + " " + e.getValue());
+    // }
+
+    String type = args[0];
+    Integer serverIndex = Integer.parseInt(args[1]);
+    String configPath = args[2];
+    File path = new File(configPath);
+    System.out.println("Loading config from [" + path + "]");
+    BlurConfiguration configuration = new BlurConfiguration(false);
+    configuration.load(path);
+    System.out.println("Configuration Loaded");
+    Map<String, String> properties = new TreeMap<String, String>(configuration.getProperties());
+    for (Entry<String, String> e : properties.entrySet()) {
+      System.out.println(e.getKey() + "=>" + e.getValue());
+    }
+
+    final ThriftServer thriftServer;
+    if (type.equals("shard")) {
+      thriftServer = ThriftBlurShardServer.createServer(serverIndex, configuration);
+    } else if (type.equals("controller")) {
+      thriftServer = ThriftBlurControllerServer.createServer(serverIndex, configuration);
+    } else {
+      throw new RuntimeException("Unknown type [" + type + "]");
+    }
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          thriftServer.start();
+        } catch (TTransportException e) {
+          e.printStackTrace();
+        }
+      }
+    }).start();
+    while (true) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      int localPort = thriftServer.getLocalPort();
+      if (localPort == 0) {
+        continue;
+      } else {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      }
+    }
+    System.out.println("ONLINE");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index 7f3ee2c..114f0d9 100644
--- a/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -17,30 +17,30 @@ package org.apache.blur;
  * limitations under the License.
  */
 
-import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
-import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
-import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
-import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.*;
 
+import java.io.BufferedReader;
+import java.io.Closeable;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -72,6 +72,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -82,16 +83,16 @@ public class MiniCluster {
   private static Log LOG = LogFactory.getLog(MiniCluster.class);
   private MiniDFSCluster cluster;
   private ZkMiniCluster zkMiniCluster = new ZkMiniCluster();
-  private List<ThriftServer> controllers = new ArrayList<ThriftServer>();
-  private List<ThriftServer> shards = new ArrayList<ThriftServer>();
+  private List<MiniClusterServer> controllers = new ArrayList<MiniClusterServer>();
+  private List<MiniClusterServer> shards = new ArrayList<MiniClusterServer>();
 
   public static void main(String[] args) throws IOException, InterruptedException, KeeperException, BlurException,
       TException {
     MiniCluster miniCluster = new MiniCluster();
     miniCluster.startDfs("./tmp/hdfs");
     miniCluster.startZooKeeper("./tmp/zk");
-    miniCluster.startControllers(1, false);
-    miniCluster.startShards(1, false);
+    miniCluster.startControllers(1, false, false);
+    miniCluster.startShards(1, false, false);
 
     try {
       Iface client = BlurClient.getClient(miniCluster.getControllerConnectionStr());
@@ -122,26 +123,29 @@ public class MiniCluster {
   }
 
   public void startBlurCluster(String path, int controllerCount, int shardCount) {
-    startBlurCluster(path, controllerCount, shardCount, false);
+    startBlurCluster(path, controllerCount, shardCount, false, false);
   }
 
-  public void startBlurCluster(String path, int controllerCount, int shardCount, boolean randomPort) {
+  public void startBlurCluster(String path, int controllerCount, int shardCount, boolean randomPort,
+      boolean externalProcesses) {
     MemoryReporter.enable();
     startDfs(path + "/hdfs");
     startZooKeeper(path + "/zk", randomPort);
     setupBuffers();
-    startControllers(controllerCount, randomPort);
-    startShards(shardCount, randomPort);
+    startControllers(controllerCount, randomPort, externalProcesses);
+    startShards(shardCount, randomPort, externalProcesses);
     try {
       waitForSafeModeToExit();
     } catch (BlurException e) {
       throw new RuntimeException(e);
     } catch (TException e) {
       throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
-  private void waitForSafeModeToExit() throws BlurException, TException {
+  private void waitForSafeModeToExit() throws BlurException, TException, IOException {
     String controllerConnectionStr = getControllerConnectionStr();
     Iface client = BlurClient.getClient(controllerConnectionStr);
     String clusterName = "default";
@@ -186,16 +190,40 @@ public class MiniCluster {
   }
 
   public String getControllerConnectionStr() {
-    StringBuilder builder = new StringBuilder();
-    for (ThriftServer server : controllers) {
-      if (builder.length() != 0) {
-        builder.append(',');
+    String zkConnectionString = zkMiniCluster.getZkConnectionString();
+    ZooKeeper zk;
+    try {
+      zk = new ZooKeeperClient(zkConnectionString, 30000, new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+
+        }
+      });
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    String onlineControllersPath = ZookeeperPathConstants.getOnlineControllersPath();
+    try {
+      List<String> children = zk.getChildren(onlineControllersPath, false);
+      StringBuilder builder = new StringBuilder();
+      for (String s : children) {
+        if (builder.length() != 0) {
+          builder.append(',');
+        }
+        builder.append(s);
+      }
+      return builder.toString();
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        zk.close();
+      } catch (InterruptedException e) {
+        LOG.error("Unkown error while trying to close ZooKeeper client.", e);
       }
-      String hostName = server.getServerTransport().getBindAddr().getHostName();
-      int localPort = server.getServerTransport().getServerSocket().getLocalPort();
-      builder.append(hostName + ":" + localPort);
     }
-    return builder.toString();
   }
 
   private void addRow(String table, int i, Iface client) throws BlurException, TException {
@@ -220,20 +248,16 @@ public class MiniCluster {
   }
 
   public void stopControllers() {
-    for (ThriftServer s : controllers) {
-      s.close();
-    }
+    IOUtils.cleanup(LOG, controllers.toArray(new Closeable[] {}));
   }
 
   public void stopShards() {
-    for (ThriftServer s : shards) {
-      s.close();
-    }
+    IOUtils.cleanup(LOG, shards.toArray(new Closeable[] {}));
   }
 
-  public void startControllers(int num, boolean randomPort) {
+  public void startControllers(int num, boolean randomPort, boolean externalProcesses) {
     BlurConfiguration configuration = getBlurConfiguration();
-    startControllers(configuration, num, randomPort);
+    startControllers(configuration, num, randomPort, externalProcesses);
   }
 
   private BlurConfiguration getBlurConfiguration(BlurConfiguration overrides) {
@@ -259,21 +283,20 @@ public class MiniCluster {
     configuration.setLong(BLUR_SHARD_SAFEMODEDELAY, 5000);
     configuration.setInt(BLUR_GUI_CONTROLLER_PORT, -1);
     configuration.setInt(BLUR_GUI_SHARD_PORT, -1);
-    configuration.set("blur.fieldtype.customtype1", TestType.class.getName());
 
     return configuration;
   }
 
-  public void startControllers(BlurConfiguration configuration, int num, boolean randomPort) {
+  public void startControllers(BlurConfiguration configuration, int num, boolean randomPort, boolean externalProcesses) {
     BlurConfiguration localConf = getBlurConfiguration(configuration);
     if (randomPort) {
       localConf.setInt(BLUR_CONTROLLER_BIND_PORT, 0);
     }
     for (int i = 0; i < num; i++) {
       try {
-        ThriftServer server = ThriftBlurControllerServer.createServer(i, localConf);
-        controllers.add(server);
-        startServer(server);
+        MiniClusterServer miniClusterServer = toMiniClusterServer(i, TYPE.controller, localConf, externalProcesses);
+        controllers.add(miniClusterServer);
+        startServer(miniClusterServer);
       } catch (Exception e) {
         LOG.error(e);
         throw new RuntimeException(e);
@@ -281,30 +304,245 @@ public class MiniCluster {
     }
   }
 
-  public void startShards(int num, boolean randomPort) {
+  enum TYPE {
+    shard, controller
+  }
+
+  private MiniClusterServer toMiniClusterServer(int serverIndex, TYPE type, final BlurConfiguration configuration,
+      boolean externalProcesses) {
+    if (externalProcesses) {
+      return toMiniClusterServerExternal(serverIndex, type, configuration);
+    } else {
+      return toMiniClusterServerInternal(serverIndex, type, configuration);
+    }
+  }
+
+  private MiniClusterServer toMiniClusterServerExternal(final int serverIndex, final TYPE type,
+      BlurConfiguration configuration) {
+    // write out configuration to tmp file.
+    // build args
+    // build class path
+    try {
+      File dir = new File("./target/blurtesttemp");
+      dir.mkdirs();
+      final File file = new File(dir, "blurtestconf." + UUID.randomUUID().toString() + ".properties").getAbsoluteFile();
+      OutputStream outputStream = new FileOutputStream(file);
+      configuration.write(outputStream);
+      outputStream.close();
+
+      System.out.println("File [" + file.getAbsolutePath() + "] exists [" + file.exists() + "]");
+
+      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+        @Override
+        public void run() {
+          file.delete();
+        }
+      }));
+
+      String javaHome = System.getProperty("java.home");
+      String classPath = System.getProperty("java.class.path");
+
+      List<String> command = new ArrayList<String>();
+      command.add(javaHome + "/bin/java");
+      command.add("-Xmx256m");
+      command.add("-Xms256m");
+      command.add("-cp");
+      command.add(classPath);
+      command.add(ExternalThriftServer.class.getName());
+      command.add(type.name());
+      command.add(Integer.toString(serverIndex));
+      command.add(file.getAbsolutePath());
+      final ProcessBuilder builder = new ProcessBuilder(command);
+
+      final MiniClusterServer miniClusterServer = new MiniClusterServer() {
+
+        private Process _process;
+        private AtomicBoolean _online = new AtomicBoolean();
+
+        @Override
+        public void close() throws IOException {
+          kill();
+        }
+
+        @Override
+        public void waitUntilOnline() {
+          while (!_online.get()) {
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              LOG.error("Unknown error while trying to wait for process to come online.", e);
+            }
+          }
+        }
+
+        @Override
+        public void start() {
+          try {
+            _process = builder.start();
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          pipeOutputToStdOut(_process.getInputStream());
+          pipeOutputToStdOut(_process.getErrorStream());
+        }
+
+        private void pipeOutputToStdOut(final InputStream inputStream) {
+          new Thread(new Runnable() {
+            @Override
+            public void run() {
+              BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+              String line;
+              try {
+                while ((line = reader.readLine()) != null) {
+                  LOG.info("Process Output Type [" + type + "] Index [" + serverIndex + "] Line [" + line + "]");
+                  if (line.trim().equals("ONLINE")) {
+                    _online.set(true);
+                  }
+                }
+              } catch (IOException e) {
+                LOG.error("Unknown error while trying to follow input stream.", e);
+              }
+            }
+          }).start();
+        }
+
+        @Override
+        public void kill() {
+          if (_process != null) {
+            _process.destroy();
+          }
+        }
+      };
+      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+        @Override
+        public void run() {
+          miniClusterServer.kill();
+        }
+      }));
+      return miniClusterServer;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  private MiniClusterServer toMiniClusterServerInternal(int serverIndex, TYPE type,
+      final BlurConfiguration configuration) {
+    final ThriftServer thriftServer;
+    try {
+      thriftServer = getThriftServer(serverIndex, type, configuration);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return new MiniClusterServer() {
+
+      @Override
+      public void close() throws IOException {
+        thriftServer.close();
+      }
+
+      @Override
+      public void kill() {
+        ZooKeeper zk = null;
+        try {
+          int shardPort = thriftServer.getServerTransport().getServerSocket().getLocalPort();
+          String nodeNameHostname = ThriftServer.getNodeName(configuration, BLUR_SHARD_HOSTNAME);
+          String nodeName = nodeNameHostname + ":" + shardPort;
+          zk = new ZooKeeperClient(getZkConnectionString(), 30000, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+
+            }
+          });
+          String onlineShardsPath = ZookeeperPathConstants
+              .getOnlineShardsPath(org.apache.blur.utils.BlurConstants.BLUR_CLUSTER);
+          String path = onlineShardsPath + "/" + nodeName;
+          zk.delete(path, -1);
+          zk.close();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        } finally {
+          if (zk != null) {
+            try {
+              zk.close();
+            } catch (InterruptedException e) {
+              LOG.error("Unknown error while trying to close ZooKeeper client.", e);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void start() {
+        try {
+          thriftServer.start();
+        } catch (TTransportException e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void waitUntilOnline() {
+        while (true) {
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            return;
+          }
+          int localPort = thriftServer.getLocalPort();
+          if (localPort == 0) {
+            continue;
+          } else {
+            try {
+              Thread.sleep(500);
+            } catch (InterruptedException e) {
+              LOG.error("Unknown error", e);
+            }
+            return;
+          }
+        }
+      }
+
+    };
+  }
+
+  private ThriftServer getThriftServer(int serverIndex, TYPE type, BlurConfiguration configuration) throws Exception {
+    switch (type) {
+    case controller:
+      return ThriftBlurControllerServer.createServer(serverIndex, configuration);
+    case shard:
+      return ThriftBlurShardServer.createServer(serverIndex, configuration);
+    default:
+      throw new RuntimeException("Type not supported [" + type + "]");
+    }
+  }
+
+  public void startShards(int num, boolean randomPort, boolean externalProcesses) {
     BlurConfiguration configuration = getBlurConfiguration();
-    startShards(configuration, num, randomPort);
+    startShards(configuration, num, randomPort, externalProcesses);
   }
 
-  public void startShards(final BlurConfiguration configuration, int num, final boolean randomPort) {
+  public void startShards(final BlurConfiguration configuration, int num, final boolean randomPort,
+      final boolean externalProcesses) {
     final BlurConfiguration localConf = getBlurConfiguration(configuration);
     if (randomPort) {
       localConf.setInt(BLUR_SHARD_BIND_PORT, 0);
     }
     ExecutorService executorService = Executors.newFixedThreadPool(num);
-    List<Future<ThriftServer>> futures = new ArrayList<Future<ThriftServer>>();
+    List<Future<MiniClusterServer>> futures = new ArrayList<Future<MiniClusterServer>>();
     for (int i = 0; i < num; i++) {
       final int index = i;
-      futures.add(executorService.submit(new Callable<ThriftServer>() {
+      futures.add(executorService.submit(new Callable<MiniClusterServer>() {
         @Override
-        public ThriftServer call() throws Exception {
-          return ThriftBlurShardServer.createServer(index, localConf);
+        public MiniClusterServer call() throws Exception {
+          return toMiniClusterServer(index, TYPE.shard, localConf, externalProcesses);
         }
       }));
     }
     for (int i = 0; i < num; i++) {
       try {
-        ThriftServer server = futures.get(i).get();
+        MiniClusterServer server = futures.get(i).get();
         shards.add(server);
         startServer(server);
       } catch (Exception e) {
@@ -320,53 +558,18 @@ public class MiniCluster {
 
   public void killShardServer(final BlurConfiguration configuration, int shardServer) throws IOException,
       InterruptedException, KeeperException {
-    ThriftServer thriftServer = shards.get(shardServer);
-    int shardPort = thriftServer.getServerTransport().getServerSocket().getLocalPort();
-    String nodeNameHostname = ThriftServer.getNodeName(configuration, BLUR_SHARD_HOSTNAME);
-    String nodeName = nodeNameHostname + ":" + shardPort;
-    ZooKeeper zk = new ZooKeeperClient(getZkConnectionString(), 30000, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-
-      }
-    });
-    String onlineShardsPath = ZookeeperPathConstants
-        .getOnlineShardsPath(org.apache.blur.utils.BlurConstants.BLUR_CLUSTER);
-    String path = onlineShardsPath + "/" + nodeName;
-    zk.delete(path, -1);
-    zk.close();
+    MiniClusterServer miniClusterServer = shards.get(shardServer);
+    miniClusterServer.kill();
   }
 
-  private static void startServer(final ThriftServer server) {
+  private static void startServer(final MiniClusterServer server) {
     new Thread(new Runnable() {
       @Override
       public void run() {
-        try {
-          server.start();
-        } catch (TTransportException e) {
-          LOG.error(e);
-          throw new RuntimeException(e);
-        }
+        server.start();
       }
     }).start();
-    while (true) {
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        return;
-      }
-      int localPort = server.getLocalPort();
-      if (localPort == 0) {
-        continue;
-      } else {
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          LOG.error("Unknown error", e);
-        }
-        return;
-      }
-    }
+    server.waitUntilOnline();
   }
 
   public String getZkConnectionString() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/java/org/apache/blur/MiniClusterServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/MiniClusterServer.java b/blur-core/src/test/java/org/apache/blur/MiniClusterServer.java
new file mode 100644
index 0000000..99ccf49
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/MiniClusterServer.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur;
+
+import java.io.Closeable;
+
+public interface MiniClusterServer extends Closeable {
+
+  void kill();
+
+  void start();
+
+  void waitUntilOnline();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/java/org/apache/blur/command/RunSlowForTesting.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/RunSlowForTesting.java b/blur-core/src/test/java/org/apache/blur/command/RunSlowForTesting.java
new file mode 100644
index 0000000..82ebd1f
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/command/RunSlowForTesting.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.commandtype.IndexReadCommandSingleTable;
+import org.apache.blur.manager.IndexManager;
+
+public class RunSlowForTesting extends IndexReadCommandSingleTable<Boolean> {
+
+  @OptionalArgument("Set run slow flag.")
+  private boolean runSlow = true;
+
+  @Override
+  public Boolean execute(IndexContext context) throws IOException, InterruptedException {
+    IndexManager.DEBUG_RUN_SLOW.set(runSlow);
+    return true;
+  }
+
+  @Override
+  public String getName() {
+    return "RunSlowForTesting";
+  }
+
+  public boolean isRunSlow() {
+    return runSlow;
+  }
+
+  public void setRunSlow(boolean runSlow) {
+    this.runSlow = runSlow;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index d66cf1a..79e4b9f 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -34,6 +34,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.blur.MiniCluster;
 import org.apache.blur.TestType;
 import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.manager.IndexManager;
+import org.apache.blur.command.RunSlowForTesting;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.Blur;
@@ -74,6 +77,7 @@ import org.apache.blur.user.UserContext;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.GCWatcher;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -89,8 +93,10 @@ public class BlurClusterTest {
 
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurClusterTest"));
   private static MiniCluster miniCluster;
+  private static boolean externalProcesses = true;
 
   private int numberOfDocs = 1000;
+  private String controllerConnectionStr;
 
   @BeforeClass
   public static void startCluster() throws IOException {
@@ -113,7 +119,7 @@ public class BlurClusterTest {
     System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
     testDirectory.delete();
     miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, externalProcesses);
   }
 
   @AfterClass
@@ -132,7 +138,10 @@ public class BlurClusterTest {
   }
 
   private Iface getClient() {
-    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
+    if (controllerConnectionStr == null) {
+      controllerConnectionStr = miniCluster.getControllerConnectionStr();
+    }
+    return BlurClient.getClient(controllerConnectionStr);
   }
 
   @Test
@@ -240,6 +249,7 @@ public class BlurClusterTest {
     tableDescriptor.setName("test_type");
     tableDescriptor.setShardCount(1);
     tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/test_type");
+    tableDescriptor.putToTableProperties("blur.fieldtype.customtype1", TestType.class.getName());
     client.createTable(tableDescriptor);
     List<String> tableList = client.tableList();
     assertTrue(tableList.contains("test_type"));
@@ -288,6 +298,7 @@ public class BlurClusterTest {
     columnDefinition.setFieldLessIndexed(true);
     columnDefinition.setFieldType("string");
     columnDefinition.setSortable(true);
+    columnDefinition.setProperties(new HashMap<String, String>());
     client.addColumnDefinition(tableName, columnDefinition);
     long s = System.nanoTime();
     client.mutateBatch(mutations);
@@ -466,7 +477,6 @@ public class BlurClusterTest {
 
   }
 
- 
   @Test
   public void testBatchFetch() throws BlurException, TException, InterruptedException, IOException {
     String tableName = "testBatchFetch";
@@ -508,9 +518,9 @@ public class BlurClusterTest {
     String uuid = "5678";
     blurQueryRow.setUuid(uuid);
     final User user = new User("testuser", new HashMap<String, String>());
-    
+
     try {
-      IndexManager.DEBUG_RUN_SLOW.set(true);
+      setDebugRunSlow(tableName, true);
       new Thread(new Runnable() {
         @Override
         public void run() {
@@ -519,7 +529,7 @@ public class BlurClusterTest {
             // This call will take several seconds to execute.
             client.query(tableName, blurQueryRow);
           } catch (BlurException e) {
-//            e.printStackTrace();
+            // e.printStackTrace();
           } catch (TException e) {
             e.printStackTrace();
           }
@@ -531,7 +541,7 @@ public class BlurClusterTest {
       assertEquals(queryStatusById.getState(), QueryState.RUNNING);
       client.cancelQuery(tableName, uuid);
     } finally {
-      IndexManager.DEBUG_RUN_SLOW.set(false);
+      setDebugRunSlow(tableName, false);
     }
   }
 
@@ -544,7 +554,7 @@ public class BlurClusterTest {
     try {
       // This will make each collect in the collectors pause 250 ms per collect
       // call
-      IndexManager.DEBUG_RUN_SLOW.set(true);
+      setDebugRunSlow(tableName, true);
       final BlurQuery blurQueryRow = new BlurQuery();
       Query queryRow = new Query();
       queryRow.setQuery("test.test:value");
@@ -579,7 +589,7 @@ public class BlurClusterTest {
       }
       assertEquals(blurException.getErrorType(), ErrorType.QUERY_CANCEL);
     } finally {
-      IndexManager.DEBUG_RUN_SLOW.set(false);
+      setDebugRunSlow(tableName, false);
     }
     // Tests that the exitable reader was reset.
     client.terms(tableName, "test", "facet", null, (short) 100);
@@ -593,17 +603,24 @@ public class BlurClusterTest {
     createTable(tableName);
     loadTable(tableName);
     try {
-      IndexManager.DEBUG_RUN_SLOW.set(true);
+      setDebugRunSlow(tableName, true);
       runBackPressureViaQuery(tableName);
       Thread.sleep(1000);
       System.gc();
       System.gc();
       Thread.sleep(1000);
     } finally {
-      IndexManager.DEBUG_RUN_SLOW.set(false);
+      setDebugRunSlow(tableName, false);
     }
   }
 
+  private void setDebugRunSlow(String table, boolean flag) throws IOException {
+    RunSlowForTesting runSlowForTesting = new RunSlowForTesting();
+    runSlowForTesting.setRunSlow(flag);
+    runSlowForTesting.setTable(table);
+    runSlowForTesting.run(getClient());
+  }
+
   private void runBackPressureViaQuery(final String tableName) throws InterruptedException {
     final Iface client = getClient();
     final BlurQuery blurQueryRow = new BlurQuery();
@@ -735,14 +752,15 @@ public class BlurClusterTest {
 
     // This should block until shards have failed over
     client.shardServerLayout(tableName);
-    
+
     assertEquals("We should have lost a node.", 2, client.shardServerList(BlurConstants.DEFAULT).size());
     assertEquals(numberOfDocs, client.query(tableName, blurQuery).getTotalResults());
 
-    miniCluster.startShards(1, true);
+    miniCluster.startShards(1, true, externalProcesses);
     Thread.sleep(TimeUnit.SECONDS.toMillis(1));
-    
-    assertEquals("We should have the cluster back where we started.", 3, client.shardServerList(BlurConstants.DEFAULT).size());
+
+    assertEquals("We should have the cluster back where we started.", 3, client.shardServerList(BlurConstants.DEFAULT)
+        .size());
   }
 
   @Test
@@ -765,6 +783,106 @@ public class BlurClusterTest {
     client2.mutate(rowMutation);
   }
 
+  @Test
+  public void testTableRemovalWithFieldDefsV1() throws BlurException, TException, IOException, InterruptedException {
+    String tableName = "testTableRemovalWithFieldDefsV1";
+    createTable(tableName);
+    loadTable(tableName);
+
+    String family = "testTableRemovalWithFieldDefs-fam";
+
+    List<Connection> connections = BlurClientManager.getConnections(miniCluster.getControllerConnectionStr());
+    Iface client1 = BlurClient.getClient(connections.get(0));
+    ColumnDefinition columnDefinition = new ColumnDefinition();
+    columnDefinition.setColumnName("col");
+    columnDefinition.setFamily(family);
+    columnDefinition.setFieldLessIndexed(false);
+    columnDefinition.setFieldType("string");
+    columnDefinition.setSortable(false);
+    columnDefinition.setSubColumnName(null);
+    assertTrue(client1.addColumnDefinition(tableName, columnDefinition));
+
+    client1.disableTable(tableName);
+    client1.removeTable(tableName, true);
+
+    createTable(tableName);
+
+    Iface client2 = BlurClient.getClient(connections.get(1));
+    assertFamilyIsNotPresent(tableName, client2, family);
+
+    List<String> shardClusterList = client2.shardClusterList();
+
+    for (String cluster : shardClusterList) {
+      List<String> shardServerList = client2.shardServerList(cluster);
+      for (String shardServer : shardServerList) {
+        Iface client = BlurClient.getClient(shardServer);
+        assertFamilyIsNotPresent(tableName, client, family);
+      }
+    }
+
+  }
+
+  @Test
+  public void testTableRemovalWithFieldDefsV2() throws BlurException, TException, IOException, InterruptedException {
+    String tableName = "testTableRemovalWithFieldDefsV2";
+    createTable(tableName);
+    loadTable(tableName);
+
+    Iface client = getClient();
+    Schema expectedSchema = client.schema(tableName);
+    Set<String> expectedFiles = new TreeSet<String>();
+    TableDescriptor describe = client.describe(tableName);
+
+    {
+      Path path = new Path(describe.getTableUri());
+      FileSystem fileSystem = path.getFileSystem(new Configuration());
+      FileStatus[] listStatus = fileSystem.listStatus(new Path(path, "types"));
+      for (FileStatus fileStatus : listStatus) {
+        System.out.println("Expected " + fileStatus.getPath());
+        expectedFiles.add(fileStatus.getPath().toString());
+      }
+    }
+
+    client.disableTable(tableName);
+    client.removeTable(tableName, true);
+
+    createTable(tableName);
+    loadTable(tableName);
+    Schema actualSchema = client.schema(tableName);
+
+    assertEquals(expectedSchema.getTable(), actualSchema.getTable());
+    Map<String, Map<String, ColumnDefinition>> expectedFamilies = expectedSchema.getFamilies();
+    Map<String, Map<String, ColumnDefinition>> actualFamilies = actualSchema.getFamilies();
+    assertEquals(new TreeSet<String>(expectedFamilies.keySet()), new TreeSet<String>(actualFamilies.keySet()));
+
+    for (String family : expectedFamilies.keySet()) {
+      Map<String, ColumnDefinition> expectedColDefMap = new TreeMap<String, ColumnDefinition>(
+          expectedFamilies.get(family));
+      Map<String, ColumnDefinition> actualColDefMap = new TreeMap<String, ColumnDefinition>(actualFamilies.get(family));
+      assertEquals(expectedColDefMap, actualColDefMap);
+    }
+
+    System.out.println(expectedSchema);
+
+    Set<String> actualFiles = new TreeSet<String>();
+    {
+      Path path = new Path(describe.getTableUri());
+      FileSystem fileSystem = path.getFileSystem(new Configuration());
+      FileStatus[] listStatus = fileSystem.listStatus(new Path(path, "types"));
+      for (FileStatus fileStatus : listStatus) {
+        System.out.println("Actual " + fileStatus.getPath());
+        actualFiles.add(fileStatus.getPath().toString());
+      }
+    }
+    assertEquals(expectedFiles, actualFiles);
+  }
+
+  private void assertFamilyIsNotPresent(String tableName, Iface client, String family) throws BlurException, TException {
+    Schema schema = client.schema(tableName);
+    Map<String, Map<String, ColumnDefinition>> families = schema.getFamilies();
+    assertNull(families.get(family));
+  }
+
   private void assertRowResults(BlurResults results) {
     for (BlurResult result : results.getResults()) {
       assertNull(result.locationId);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/java/org/apache/blur/thrift/SuiteCluster.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/SuiteCluster.java b/blur-core/src/test/java/org/apache/blur/thrift/SuiteCluster.java
index db823a7..36bc982 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/SuiteCluster.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/SuiteCluster.java
@@ -48,7 +48,7 @@ public class SuiteCluster {
     testDirectory.delete();
     if (cluster == null) {
       cluster = new MiniCluster();
-      cluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+      cluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
 
       System.out.println("MiniCluster started at " + cluster.getControllerConnectionStr());
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-core/src/test/resources/META-INF/services/org.apache.blur.command.Commands
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/META-INF/services/org.apache.blur.command.Commands b/blur-core/src/test/resources/META-INF/services/org.apache.blur.command.Commands
index f06a262..ce500da 100644
--- a/blur-core/src/test/resources/META-INF/services/org.apache.blur.command.Commands
+++ b/blur-core/src/test/resources/META-INF/services/org.apache.blur.command.Commands
@@ -14,4 +14,5 @@
 #  limitations under the License.
 
 org.apache.blur.command.WaitForSeconds
-org.apache.blur.command.ThrowException
\ No newline at end of file
+org.apache.blur.command.ThrowException
+org.apache.blur.command.RunSlowForTesting

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
index dc30ca4..c5e2562 100644
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -90,7 +90,7 @@ public class BlurOutputFormatMiniClusterTest {
     System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
     testDirectory.delete();
     miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
 
     // System.setProperty("test.build.data",
     // "./target/BlurOutputFormatTest/data");
@@ -162,7 +162,7 @@ public class BlurOutputFormatMiniClusterTest {
     BlurOutputFormat.setupJob(job, tableDescriptor);
     Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
-    
+
     Path tablePath = new Path(tableUri);
     Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
     FileStatus[] listStatus = fileSystem.listStatus(shardPath);
@@ -175,7 +175,7 @@ public class BlurOutputFormatMiniClusterTest {
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
-    
+
     client.loadData(tableName, output.toString());
 
     while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
index 43ff57f..621a364 100644
--- a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -90,7 +90,7 @@ public class BlurOutputFormatMiniClusterTest {
     System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
     testDirectory.delete();
     miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
 
     // System.setProperty("test.build.data",
     // "./target/BlurOutputFormatTest/data");
@@ -108,7 +108,7 @@ public class BlurOutputFormatMiniClusterTest {
     mr = (MiniMRYarnClusterAdapter) MiniMRClientClusterFactory.create(BlurOutputFormatTest.class, 1, conf);
     mr.start();
     conf = mr.getConfig();
-    
+
     BufferStore.initNewBuffer(128, 128 * 128);
   }
 
@@ -152,8 +152,8 @@ public class BlurOutputFormatMiniClusterTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory())
-        .toString();
+    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(),
+        fileSystem.getWorkingDirectory()).toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
@@ -167,7 +167,7 @@ public class BlurOutputFormatMiniClusterTest {
     BlurOutputFormat.setupJob(job, tableDescriptor);
     Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
-    
+
     Path tablePath = new Path(tableUri);
     Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
     FileStatus[] listStatus = fileSystem.listStatus(shardPath);
@@ -180,7 +180,7 @@ public class BlurOutputFormatMiniClusterTest {
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
-    
+
     client.loadData(tableName, output.toString());
 
     while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index 953ae04..84e00fc 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -16,10 +16,11 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -42,6 +43,8 @@ import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.SerialMergeScheduler;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.Version;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -112,9 +115,14 @@ public class FastHdfsKeyValueDirectoryTest {
     Random random = new Random(seed);
     int docCount = 0;
     int passes = 50;
+    byte[] segmentsGenContents = null;
     for (int run = 0; run < passes; run++) {
       final FastHdfsKeyValueDirectory directory = new FastHdfsKeyValueDirectory(_timer, _configuration, new Path(_path,
           "test_multiple_commits_reopens"));
+      if (segmentsGenContents != null) {
+        byte[] segmentsGenContentsCurrent = readSegmentsGen(directory);
+        assertTrue(Arrays.equals(segmentsGenContents, segmentsGenContentsCurrent));
+      }
       assertFiles(fileSet, run, -1, directory);
       assertEquals(docCount, getDocumentCount(directory));
       IndexWriter writer = new IndexWriter(directory, conf.clone());
@@ -134,11 +142,23 @@ public class FastHdfsKeyValueDirectoryTest {
           IndexCommit indexCommit = listCommits.get(0);
           fileSet.addAll(indexCommit.getFileNames());
         }
+        segmentsGenContents = readSegmentsGen(directory);
       }
       docCount = getDocumentCount(directory);
     }
   }
 
+  private byte[] readSegmentsGen(FastHdfsKeyValueDirectory directory) throws IOException {
+    boolean fileExists = directory.fileExists("segments.gen");
+    if (!fileExists) {
+      return null;
+    }
+    IndexInput input = directory.openInput("segments.gen", IOContext.READ);
+    byte[] data = new byte[(int) input.length()];
+    input.readBytes(data, 0, data.length);
+    return data;
+  }
+
   private int getDocumentCount(Directory directory) throws IOException {
     if (DirectoryReader.indexExists(directory)) {
       DirectoryReader reader = DirectoryReader.open(directory);
@@ -154,7 +174,7 @@ public class FastHdfsKeyValueDirectoryTest {
     Set<String> actual;
     if (DirectoryReader.indexExists(directory)) {
       List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
-      assertEquals(1, listCommits.size());
+      // assertEquals(1, listCommits.size());
       IndexCommit indexCommit = listCommits.get(0);
       actual = new TreeSet<String>(indexCommit.getFileNames());
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9724d30/blur-util/src/main/java/org/apache/blur/BlurConfiguration.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/BlurConfiguration.java b/blur-util/src/main/java/org/apache/blur/BlurConfiguration.java
index c6b256f..3eac654 100644
--- a/blur-util/src/main/java/org/apache/blur/BlurConfiguration.java
+++ b/blur-util/src/main/java/org/apache/blur/BlurConfiguration.java
@@ -16,9 +16,12 @@ package org.apache.blur;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -63,15 +66,36 @@ public class BlurConfiguration implements Cloneable {
   }
 
   private void init() throws IOException {
-    _properties.putAll(load("/blur-default.properties"));
-    _properties.putAll(load("/blur-site.properties"));
+    _properties.putAll(loadInternal("/blur-default.properties"));
+    _properties.putAll(loadInternal("/blur-site.properties"));
   }
 
-  private Properties load(String path) throws IOException {
+  public void load(InputStream inputStream) throws IOException {
+    _properties.putAll(loadInternal(inputStream));
+  }
+
+  public void load(File path) throws IOException {
+    FileInputStream inputStream = new FileInputStream(path);
+    try {
+      _properties.putAll(loadInternal(inputStream));
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  private Properties loadInternal(String path) throws IOException {
     InputStream inputStream = getClass().getResourceAsStream(path);
     if (inputStream == null) {
       throw new FileNotFoundException(path);
     }
+    try {
+      return loadInternal(inputStream);
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  private Properties loadInternal(InputStream inputStream) throws IOException {
     Properties properties = new Properties();
     properties.load(inputStream);
     return properties;
@@ -88,13 +112,13 @@ public class BlurConfiguration implements Cloneable {
   public String get(String name) {
     return get(name, null);
   }
-  
+
   public String getExpected(String name) {
-	  String val = get(name);
-	  if (val == null || val.trim().isEmpty()) {
-	      throw new IllegalArgumentException("Property [" + name + "] is missing or blank.");
-	    }
-	    return val;
+    String val = get(name);
+    if (val == null || val.trim().isEmpty()) {
+      throw new IllegalArgumentException("Property [" + name + "] is missing or blank.");
+    }
+    return val;
   }
 
   public String get(String name, String defaultValue) {
@@ -158,4 +182,8 @@ public class BlurConfiguration implements Cloneable {
     return clone;
   }
 
+  public void write(OutputStream outputStream) throws IOException {
+    _properties.store(outputStream, null);
+  }
+
 }


Mime
View raw message