incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [11/11] git commit: Trying to track down a nasty ZooKeeper bug.
Date Thu, 10 Jan 2013 19:49:57 GMT
Updated Branches:
  refs/heads/0.2-dev fb233db55 -> 57fd000d4


Trying to track down a nasty ZooKeeper bug.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/57fd000d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/57fd000d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/57fd000d

Branch: refs/heads/0.2-dev
Commit: 57fd000d414363902c462e90c2ccb8c88baad559
Parents: fa69294
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 10 14:49:17 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 10 14:49:17 2013 -0500

----------------------------------------------------------------------
 .../clusterstatus/SimpleZKClusterStatus.java       |  219 +++++++++++++++
 .../clusterstatus/ZookeeperClusterStatus.java      |    7 +-
 .../clusterstatus/ZookeeperPathConstants.java      |   67 +-----
 .../indexserver/DistributedIndexServer.java        |   13 +-
 .../apache/blur/thrift/AbstractThriftServer.java   |  161 +++++++++++
 .../org/apache/blur/thrift/ThriftBlurServer.java   |   13 +-
 .../java/org/apache/blur/thrift/ThriftServer.java  |  161 -----------
 .../main/java/org/apache/blur/utils/BlurUtil.java  |    7 +-
 .../src/test/java/org/apache/blur/MiniCluster.java |   20 +-
 .../org/apache/blur/shell/SchemaTableCommand.java  |    2 +-
 src/blur-util/pom.xml                              |   19 ++-
 .../org/apache/blur/zookeeper/WatchChildren.java   |    9 +-
 12 files changed, 438 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/SimpleZKClusterStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/SimpleZKClusterStatus.java
b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/SimpleZKClusterStatus.java
new file mode 100644
index 0000000..541e897
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/SimpleZKClusterStatus.java
@@ -0,0 +1,219 @@
+package org.apache.blur.manager.clusterstatus;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Analyzer;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class SimpleZKClusterStatus extends ClusterStatus {
+  private static final Log LOG = LogFactory.getLog(SimpleZKClusterStatus.class);
+  private ZooKeeper _zk;
+  private String _cluster;
+
+  public SimpleZKClusterStatus(String cluster, ZooKeeper zooKeeper) {
+    _cluster = cluster;
+    _zk = zooKeeper;
+  }
+
+  @Override
+  public List<String> getOnlineServers(boolean useCache) {
+    try {
+      return _zk.getChildren(ZookeeperPathConstants.getOnlineServersPath(_cluster), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public List<String> getServerList(boolean useCache) {
+    try {
+      return _zk.getChildren(ZookeeperPathConstants.getRegisteredServersPath(_cluster), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String getClusterName() {
+    return _cluster;
+  }
+
+  @Override
+  public boolean exists(boolean useCache, String table) {
+    List<String> tableList = getTableList(useCache);
+    return tableList.contains(table);
+  }
+
+  @Override
+  public boolean isInSafeMode(boolean useCache) {
+    try {
+      String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(_cluster);
+      Stat stat = _zk.exists(blurSafemodePath, false);
+      if (stat == null) {
+        return false;
+      }
+      byte[] data = _zk.getData(blurSafemodePath, false, stat);
+      if (data == null) {
+        return false;
+      }
+      long timestamp = Long.parseLong(new String(data));
+      long waitTime = timestamp - System.currentTimeMillis();
+      if (waitTime > 0) {
+        return true;
+      }
+      return false;
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public List<String> getTableList(boolean useCache) {
+    try {
+      return _zk.getChildren(ZookeeperPathConstants.getTablesPath(_cluster), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public TableDescriptor getTableDescriptor(boolean useCache, String table) {
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    try {
+      String blurTablePath = ZookeeperPathConstants.getTablePath(getClusterName(), table);
+      byte[] data = getData(blurTablePath);
+      BlurUtil.write(data, tableDescriptor);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    return tableDescriptor;
+  }
+
+  @Override
+  public void createTable(TableDescriptor tableDescriptor) {
+    try {
+      String table = BlurUtil.nullCheck(tableDescriptor.getName(), "Name cannot be null.");
+      String uri = BlurUtil.nullCheck(tableDescriptor.getStoragePath(), "Storage path cannot
be null.");
+      int shardCount = BlurUtil.zeroCheck(tableDescriptor.shardCount, "ShardCount cannot
be less than 1");
+      String tablePath = ZookeeperPathConstants.getTablePath(getClusterName(), table);
+
+      Analyzer analyzer = tableDescriptor.getAnalyzer();
+      if (analyzer != null) {
+        // check the analyzer to be valid
+        new BlurAnalyzer(analyzer);
+      }
+
+      if (_zk.exists(tablePath, false) != null) {
+        throw new IOException("Table [" + table + "] already exists.");
+      }
+      BlurUtil.setupFileSystem(uri, shardCount);
+      BlurUtil.createPath(_zk, tablePath, BlurUtil.read(tableDescriptor));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void disableTable(String table) throws IOException {
+    try {
+      String tablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
+      if (_zk.exists(tablePath, false) == null) {
+        throw new IOException("Table [" + table + "] does not exist.");
+      }
+      TableDescriptor tableDescriptor = getTableDescriptor(false, table);
+      if (!tableDescriptor.isEnabled()) {
+        throw new IOException("Table [" + table + "] is already disabled.");
+      }
+      tableDescriptor.setEnabled(false);
+      _zk.setData(tablePath, BlurUtil.read(tableDescriptor), -1);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void enableTable(String table) {
+    try {
+      String tablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
+      if (_zk.exists(tablePath, false) == null) {
+        throw new IOException("Table [" + table + "] does not exist.");
+      }
+      TableDescriptor tableDescriptor = getTableDescriptor(false, table);
+      if (tableDescriptor.isEnabled()) {
+        LOG.info("Table [" + table + "] is already enabled.");
+      }
+      tableDescriptor.setEnabled(true);
+      _zk.setData(tablePath, BlurUtil.read(tableDescriptor), -1);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void removeTable(String table, boolean deleteIndexFiles) {
+    try {
+      String tablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
+      if (_zk.exists(tablePath, false) == null) {
+        throw new IOException("Table [" + table + "] does not exist.");
+      }
+      TableDescriptor tableDescriptor = getTableDescriptor(false, table);
+      if (tableDescriptor.isEnabled()) {
+        throw new IOException("Table [" + table + "] is NOT disabled.");
+      }
+      _zk.delete(tablePath, -1);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  private byte[] getData(String path) throws KeeperException, InterruptedException {
+    Stat stat = _zk.exists(path, false);
+    if (stat == null) {
+      return null;
+    }
+    return _zk.getData(path, false, stat);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index 8220d40..ad42efb 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -133,7 +133,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     long s = System.nanoTime();
     try {
       checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getClustersPath() + "/" + _cluster +
"/online/shard-nodes", false);
+      return _zk.getChildren(ZookeeperPathConstants.getOnlineServersPath(_cluster), false);
     } catch (KeeperException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {
@@ -145,7 +145,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   }
 
   private void watchForOnlineShardNodes(final String cluster) {
-    WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineShardsPath(cluster)).watch(new
OnChange() {
+    WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineServersPath(cluster)).watch(new
OnChange() {
       @Override
       public void action(List<String> children) {
         _onlineShardsNodes.put(cluster, children);
@@ -162,7 +162,8 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     long s = System.nanoTime();
     try {
       checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getClustersPath() + "/" + _cluster +
"/shard-nodes", false);
+      
+      return _zk.getChildren(ZookeeperPathConstants.getRegisteredServersPath(_cluster), false);
     } catch (KeeperException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
index 7032f74..62258cb 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
@@ -31,30 +31,10 @@ public class ZookeeperPathConstants {
     return getBasePath() + "/clusters";
   }
 
-  public static String getOnlineControllersPath() {
-    return getBasePath() + "/online-controller-nodes";
-  }
-
-//  public static String getTableEnabledPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/enabled";
-//  }
-
-//  public static String getTableUriPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/uri";
-//  }
-
-//  public static String getTableShardCountPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/shard-count";
-//  }
-
-  public static String getOnlinePath(String cluster) {
+  public static String getOnlineServersPath(String cluster) {
     return getClusterPath(cluster) + "/online";
   }
 
-  public static String getOnlineShardsPath(String cluster) {
-    return getOnlinePath(cluster) + "/shard-nodes";
-  }
-
   public static String getTablesPath(String cluster) {
     return getClusterPath(cluster) + "/tables";
   }
@@ -67,48 +47,7 @@ public class ZookeeperPathConstants {
     return getClusterPath(cluster) + "/safemode";
   }
 
-  public static String getRegisteredShardsPath(String cluster) {
-    return getClusterPath(cluster) + "/shard-nodes";
+  public static String getRegisteredServersPath(String cluster) {
+    return getClusterPath(cluster) + "/servers";
   }
-
-//  public static String getTableCompressionCodecPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/compression-codec";
-//  }
-
-//  public static String getTableCompressionBlockSizePath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/compression-blocksize";
-//  }
-
-//  public static String getLockPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/locks";
-//  }
-
-//  public static String getTableBlockCachingFileTypesPath(String cluster, String table)
{
-//    return getTablePath(cluster, table) + "/blockcachingfiletypes";
-//  }
-
-//  public static String getTableBlockCachingPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/blockcaching";
-//  }
-
-//  public static String getTableSimilarityPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/similarity";
-//  }
-
-//  public static String getTableFieldNamesPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/fieldnames";
-//  }
-
-//  public static String getTableFieldNamesPath(String cluster, String table, String fieldName)
{
-//    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
-//  }
-
-//  public static String getTableReadOnlyPath(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/readonly";
-//  }
-
-//  public static String getTableColumnsToPreCache(String cluster, String table) {
-//    return getTablePath(cluster, table) + "/precache";
-//  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 033a957..3e1082b 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -134,12 +134,11 @@ public class DistributedIndexServer extends AbstractIndexServer {
     waitInSafeModeIfNeeded();
     _running.set(true);
     setupTableWarmer();
-    watchForShardServerChanges();
+//    watchForShardServerChanges();
   }
 
   private void watchForShardServerChanges() {
-    ZookeeperPathConstants.getOnlineShardsPath(_cluster);
-    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(_cluster)).watch(new
OnChange() {
+    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineServersPath(_cluster)).watch(new
OnChange() {
       private List<String> _prevOnlineShards = new ArrayList<String>();
 
       @Override
@@ -185,10 +184,10 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void setupSafeMode() throws KeeperException, InterruptedException {
-    String shardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster);
+    String shardsPath = ZookeeperPathConstants.getOnlineServersPath(_cluster);
     List<String> children = _zookeeper.getChildren(shardsPath, false);
     if (children.size() == 0) {
-      throw new RuntimeException("No shards registered!");
+      throw new RuntimeException("No servers registered!");
     }
     if (children.size() != 1) {
       return;
@@ -264,8 +263,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
   private void registerMyself() {
     String nodeName = getNodeName();
-    String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(_cluster)
+ "/" + nodeName;
-    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster) + "/"
+ nodeName;
+    String registeredShardsPath = ZookeeperPathConstants.getRegisteredServersPath(_cluster)
+ "/" + nodeName;
+    String onlineShardsPath = ZookeeperPathConstants.getOnlineServersPath(_cluster) + "/"
+ nodeName;
     try {
       if (_zookeeper.exists(registeredShardsPath, false) == null) {
         _zookeeper.create(registeredShardsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/thrift/AbstractThriftServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/AbstractThriftServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/AbstractThriftServer.java
new file mode 100644
index 0000000..da1775e
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/AbstractThriftServer.java
@@ -0,0 +1,161 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
+import org.apache.blur.thrift.ExecutorServicePerMethodCallThriftServer.Args;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.thrift.TBaseProcessor;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+
+
+public abstract class AbstractThriftServer {
+
+  private static final Log LOG = LogFactory.getLog(AbstractThriftServer.class);
+
+  private String _nodeName;
+  private TServer _server;
+  private boolean _closed;
+  private BlurConfiguration _configuration;
+  private int _threadCount;
+  private int _bindPort;
+  private String _bindAddress;
+  private BlurShutdown _shutdown;
+  private ExecutorService _executorService;
+  private ExecutorService _queryExexutorService;
+  private ExecutorService _mutateExecutorService;
+  private TBaseProcessor<?> _processor;
+
+  public synchronized void close() {
+    if (!_closed) {
+      _closed = true;
+      _shutdown.shutdown();
+      _server.stop();
+      _executorService.shutdownNow();
+      _queryExexutorService.shutdownNow();
+      _mutateExecutorService.shutdownNow();
+    }
+  }
+
+  protected static int getServerIndex(String[] args) {
+    for (int i = 0; i < args.length; i++) {
+      if ("-s".equals(args[i])) {
+        if (i + 1 < args.length) {
+          return Integer.parseInt(args[i + 1]);
+        }
+      }
+    }
+    return 0;
+  }
+
+  public void start() throws TTransportException {
+//    Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
+    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(getBindInetSocketAddress(_configuration));
+
+    Args args = new Args(serverTransport);
+    args.processor(_processor);
+    _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
+    args.executorService(_executorService);
+    Map<String, ExecutorService> methodCallsToExecutorService = new HashMap<String,
ExecutorService>();
+    _mutateExecutorService = Executors.newThreadPool("thrift-processors-mutate", _threadCount);
+    methodCallsToExecutorService.put("mutate", _mutateExecutorService);
+    methodCallsToExecutorService.put("mutateBatch", _mutateExecutorService);
+    _queryExexutorService = Executors.newThreadPool("thrift-processors-query", _threadCount);
+    methodCallsToExecutorService.put("query", _queryExexutorService);
+    args.setMethodCallsToExecutorService(methodCallsToExecutorService);
+    _server = new ExecutorServicePerMethodCallThriftServer(args);
+    LOG.info("Starting server [{0}]", _nodeName);
+    _server.serve();
+  }
+
+  public InetSocketAddress getBindInetSocketAddress(BlurConfiguration configuration) {
+    return new InetSocketAddress(_bindAddress, _bindPort);
+  }
+
+  public static String isEmpty(String str, String name) {
+    if (str == null || str.trim().isEmpty()) {
+      throw new IllegalArgumentException("Property [" + name + "] is missing or blank.");
+    }
+    return str;
+  }
+
+  public String getNodeName() {
+    return _nodeName;
+  }
+
+  public void setNodeName(String nodeName) {
+    this._nodeName = nodeName;
+  }
+
+  public void setConfiguration(BlurConfiguration configuration) {
+    this._configuration = configuration;
+  }
+
+  public static String getNodeName(BlurConfiguration configuration, String hostNameProperty)
throws UnknownHostException {
+    String hostName = configuration.get(hostNameProperty);
+    if (hostName == null) {
+      hostName = "";
+    }
+    hostName = hostName.trim();
+    if (hostName.isEmpty()) {
+      return InetAddress.getLocalHost().getHostName();
+    }
+    return hostName;
+  }
+
+  public void setBindPort(int bindPort) {
+    _bindPort = bindPort;
+  }
+
+  public void setBindAddress(String bindAddress) {
+    _bindAddress = bindAddress;
+  }
+
+  public void setThreadCount(int threadCount) {
+    this._threadCount = threadCount;
+  }
+
+  public BlurShutdown getShutdown() {
+    return _shutdown;
+  }
+
+  public void setShutdown(BlurShutdown shutdown) {
+    this._shutdown = shutdown;
+  }
+  
+  public TBaseProcessor<?> getProcessor() {
+    return _processor;
+  }
+
+  public void setProcessor(TBaseProcessor<?> processor) {
+    this._processor = processor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
index 876810c..a597250 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
@@ -48,7 +48,8 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.BlurFilterCache;
 import org.apache.blur.manager.DefaultBlurFilterCache;
-import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+//import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.clusterstatus.SimpleZKClusterStatus;
 import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
@@ -72,7 +73,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooKeeper;
 
-public class ThriftBlurServer extends ThriftServer {
+public class ThriftBlurServer extends AbstractThriftServer {
 
   private static final Log LOG = LogFactory.getLog(ThriftBlurServer.class);
 
@@ -83,11 +84,11 @@ public class ThriftBlurServer extends ThriftServer {
     Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
     BlurConfiguration configuration = new BlurConfiguration();
 
-    ThriftServer server = createServer(serverIndex, configuration);
+    AbstractThriftServer server = createServer(serverIndex, configuration);
     server.start();
   }
 
-  public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration)
throws Exception {
+  public static AbstractThriftServer createServer(int serverIndex, BlurConfiguration configuration)
throws Exception {
     // setup block cache
     // 134,217,728 is the slab size, therefore there are 16,384 blocks
     // in a slab when using a block size of 8,192
@@ -160,8 +161,10 @@ public class ThriftBlurServer extends ThriftServer {
     }
 
     BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
+    
+    final SimpleZKClusterStatus clusterStatus = new SimpleZKClusterStatus(BlurConstants.BLUR_CLUSTER,
zooKeeper);
 
-    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(BlurConstants.BLUR_CLUSTER,
zooKeeper);
+//    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(BlurConstants.BLUR_CLUSTER,
zooKeeper);
 
     final BlurIndexRefresher refresher = new BlurIndexRefresher();
     refresher.init();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
deleted file mode 100644
index 8fd5920..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package org.apache.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
-import org.apache.blur.thrift.ExecutorServicePerMethodCallThriftServer.Args;
-import org.apache.blur.thrift.generated.Blur;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.thrift.TBaseProcessor;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TTransportException;
-
-
-public class ThriftServer {
-
-  private static final Log LOG = LogFactory.getLog(ThriftServer.class);
-
-  private String _nodeName;
-  private TServer _server;
-  private boolean _closed;
-  private BlurConfiguration _configuration;
-  private int _threadCount;
-  private int _bindPort;
-  private String _bindAddress;
-  private BlurShutdown _shutdown;
-  private ExecutorService _executorService;
-  private ExecutorService _queryExexutorService;
-  private ExecutorService _mutateExecutorService;
-  private TBaseProcessor<?> _processor;
-
-  public synchronized void close() {
-    if (!_closed) {
-      _closed = true;
-      _shutdown.shutdown();
-      _server.stop();
-      _executorService.shutdownNow();
-      _queryExexutorService.shutdownNow();
-      _mutateExecutorService.shutdownNow();
-    }
-  }
-
-  protected static int getServerIndex(String[] args) {
-    for (int i = 0; i < args.length; i++) {
-      if ("-s".equals(args[i])) {
-        if (i + 1 < args.length) {
-          return Integer.parseInt(args[i + 1]);
-        }
-      }
-    }
-    return 0;
-  }
-
-  public void start() throws TTransportException {
-//    Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
-    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(getBindInetSocketAddress(_configuration));
-
-    Args args = new Args(serverTransport);
-    args.processor(_processor);
-    _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
-    args.executorService(_executorService);
-    Map<String, ExecutorService> methodCallsToExecutorService = new HashMap<String,
ExecutorService>();
-    _mutateExecutorService = Executors.newThreadPool("thrift-processors-mutate", _threadCount);
-    methodCallsToExecutorService.put("mutate", _mutateExecutorService);
-    methodCallsToExecutorService.put("mutateBatch", _mutateExecutorService);
-    _queryExexutorService = Executors.newThreadPool("thrift-processors-query", _threadCount);
-    methodCallsToExecutorService.put("query", _queryExexutorService);
-    args.setMethodCallsToExecutorService(methodCallsToExecutorService);
-    _server = new ExecutorServicePerMethodCallThriftServer(args);
-    LOG.info("Starting server [{0}]", _nodeName);
-    _server.serve();
-  }
-
-  public InetSocketAddress getBindInetSocketAddress(BlurConfiguration configuration) {
-    return new InetSocketAddress(_bindAddress, _bindPort);
-  }
-
-  public static String isEmpty(String str, String name) {
-    if (str == null || str.trim().isEmpty()) {
-      throw new IllegalArgumentException("Property [" + name + "] is missing or blank.");
-    }
-    return str;
-  }
-
-  public String getNodeName() {
-    return _nodeName;
-  }
-
-  public void setNodeName(String nodeName) {
-    this._nodeName = nodeName;
-  }
-
-  public void setConfiguration(BlurConfiguration configuration) {
-    this._configuration = configuration;
-  }
-
-  public static String getNodeName(BlurConfiguration configuration, String hostNameProperty)
throws UnknownHostException {
-    String hostName = configuration.get(hostNameProperty);
-    if (hostName == null) {
-      hostName = "";
-    }
-    hostName = hostName.trim();
-    if (hostName.isEmpty()) {
-      return InetAddress.getLocalHost().getHostName();
-    }
-    return hostName;
-  }
-
-  public void setBindPort(int bindPort) {
-    _bindPort = bindPort;
-  }
-
-  public void setBindAddress(String bindAddress) {
-    _bindAddress = bindAddress;
-  }
-
-  public void setThreadCount(int threadCount) {
-    this._threadCount = threadCount;
-  }
-
-  public BlurShutdown getShutdown() {
-    return _shutdown;
-  }
-
-  public void setShutdown(BlurShutdown shutdown) {
-    this._shutdown = shutdown;
-  }
-  
-  public TBaseProcessor<?> getProcessor() {
-    return _processor;
-  }
-
-  public void setProcessor(TBaseProcessor<?> processor) {
-    this._processor = processor;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index 1d8bea0..c32bd57 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -42,7 +42,6 @@ import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
 import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.metrics.BlurMetrics.MethodCall;
 import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.ScoreDoc;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -110,14 +109,12 @@ public class BlurUtil {
 
   public synchronized static void setupZookeeper(ZooKeeper zookeeper, String cluster) throws
KeeperException, InterruptedException {
     BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getBasePath());
-    BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlineControllersPath());
     BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getClustersPath());
     if (cluster != null) {
       BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getClusterPath(cluster));
       BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getSafemodePath(cluster));
-      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getRegisteredShardsPath(cluster));
-      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlinePath(cluster));
-      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlineShardsPath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getRegisteredServersPath(cluster));
+      BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getOnlineServersPath(cluster));
       BlurUtil.createIfMissing(zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index 4f89e12..cfa8c3c 100644
--- a/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -46,7 +46,7 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.BlurClientManager;
 import org.apache.blur.thrift.Connection;
 import org.apache.blur.thrift.ThriftBlurServer;
-import org.apache.blur.thrift.ThriftServer;
+import org.apache.blur.thrift.AbstractThriftServer;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -70,8 +70,8 @@ public abstract class MiniCluster {
   private static MiniDFSCluster cluster;
   private static Thread serverThread;
   private static ZooKeeperServerMainEmbedded zooKeeperServerMain;
-  private static List<ThriftServer> controllers = new ArrayList<ThriftServer>();
-  private static List<ThriftServer> shards = new ArrayList<ThriftServer>();
+  private static List<AbstractThriftServer> controllers = new ArrayList<AbstractThriftServer>();
+  private static List<AbstractThriftServer> shards = new ArrayList<AbstractThriftServer>();
   private static int zkPort;
   private static String connectionStr;
 
@@ -89,13 +89,13 @@ public abstract class MiniCluster {
   }
 
   public static void stopControllers() {
-    for (ThriftServer s : controllers) {
+    for (AbstractThriftServer s : controllers) {
       s.close();
     }
   }
 
   public static void stopShards() {
-    for (ThriftServer s : shards) {
+    for (AbstractThriftServer s : shards) {
       s.close();
     }
   }
@@ -134,12 +134,12 @@ public abstract class MiniCluster {
   public static void startShards(final BlurConfiguration configuration, int num) {
     final BlurConfiguration localConf = getBlurConfiguration(configuration);
     ExecutorService executorService = Executors.newFixedThreadPool(num);
-    List<Future<ThriftServer>> futures = new ArrayList<Future<ThriftServer>>();
+    List<Future<AbstractThriftServer>> futures = new ArrayList<Future<AbstractThriftServer>>();
     for (int i = 0; i < num; i++) {
       final int index = i;
-      futures.add(executorService.submit(new Callable<ThriftServer>() {
+      futures.add(executorService.submit(new Callable<AbstractThriftServer>() {
         @Override
-        public ThriftServer call() throws Exception {
+        public AbstractThriftServer call() throws Exception {
           return ThriftBlurServer.createServer(index, localConf);
         }
       }));
@@ -148,7 +148,7 @@ public abstract class MiniCluster {
     int shardPort = localConf.getInt(BLUR_SHARD_BIND_PORT, 40020);
     for (int i = 0; i < num; i++) {
       try {
-        ThriftServer server = futures.get(i).get();
+        AbstractThriftServer server = futures.get(i).get();
         shards.add(server);
         Connection connection = new Connection("localhost", shardPort + i);
         startServer(server, connection);
@@ -164,7 +164,7 @@ public abstract class MiniCluster {
     }
   }
 
-  private static void startServer(final ThriftServer server, Connection connection) {
+  private static void startServer(final AbstractThriftServer server, Connection connection)
{
     new Thread(new Runnable() {
       @Override
       public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-shell/src/main/java/org/apache/blur/shell/SchemaTableCommand.java
----------------------------------------------------------------------
diff --git a/src/blur-shell/src/main/java/org/apache/blur/shell/SchemaTableCommand.java b/src/blur-shell/src/main/java/org/apache/blur/shell/SchemaTableCommand.java
index b740cf0..f98bfed 100644
--- a/src/blur-shell/src/main/java/org/apache/blur/shell/SchemaTableCommand.java
+++ b/src/blur-shell/src/main/java/org/apache/blur/shell/SchemaTableCommand.java
@@ -39,7 +39,7 @@ public class SchemaTableCommand extends Command {
     }
     String tablename = args[1];
 
-    out.println(client.schema(tablename));
+//    out.println(client.schema(tablename));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-util/pom.xml
----------------------------------------------------------------------
diff --git a/src/blur-util/pom.xml b/src/blur-util/pom.xml
index 50499e5..663cb6f 100644
--- a/src/blur-util/pom.xml
+++ b/src/blur-util/pom.xml
@@ -36,7 +36,24 @@ under the License.
 			<groupId>org.apache.zookeeper</groupId>
 			<artifactId>zookeeper</artifactId>
 			<version>3.4.4</version>
-			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 		<dependency>
 			<groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57fd000d/src/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java b/src/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
index d880493..8c7c3da 100644
--- a/src/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
+++ b/src/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
@@ -32,7 +32,6 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 
-
 public class WatchChildren implements Closeable {
 
   private final static Log LOG = LogFactory.getLog(WatchChildren.class);
@@ -69,7 +68,7 @@ public class WatchChildren implements Closeable {
     _watchThread = new Thread(new Runnable() {
       @Override
       public void run() {
-        startDoubleCheckThread();
+//        startDoubleCheckThread();
         while (_running.get()) {
           synchronized (_lock) {
             try {
@@ -81,7 +80,11 @@ public class WatchChildren implements Closeable {
                   }
                 }
               });
-              onChange.action(_children);
+              try {
+                onChange.action(_children);
+              } catch (Throwable t) {
+                LOG.error("Unknown error during onchange action [" + this + "].", t);
+              }
               _lock.wait();
             } catch (KeeperException e) {
               LOG.error("Error in instance [{0}]", e, instance);


Mime
View raw message