incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twilli...@apache.org
Subject [1/2] git commit: Allow configs of cluster name to be non-static
Date Tue, 25 Sep 2012 00:37:05 GMT
Updated Branches:
  refs/heads/blur-25-cluster-config [created] 17ac0d02f


Allow configs of cluster name to be non-static


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9f11647d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9f11647d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9f11647d

Branch: refs/heads/blur-25-cluster-config
Commit: 9f11647dc0f6fb043387bdd57b588dc5be4c6b48
Parents: 3ff1403
Author: Tim <twilliams@apache.org>
Authored: Mon Sep 24 18:18:54 2012 -0400
Committer: Tim <twilliams@apache.org>
Committed: Mon Sep 24 19:59:46 2012 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java        |   54 ++++++++-------
 .../org/apache/blur/thrift/BlurShardServer.java    |   11 +++
 .../java/org/apache/blur/thrift/TableAdmin.java    |    4 +
 .../apache/blur/thrift/ThriftBlurShardServer.java  |    6 +-
 4 files changed, 47 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f11647d/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 2d60131..65da0a0 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -109,6 +109,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private Cache _cache;
   private BlurMetrics _blurMetrics;
   private ZooKeeper _zookeeper;
+  private String _cluster;
 
   // set internally
   private Timer _timerCacheFlush;
@@ -120,7 +121,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private long _safeModeDelay;
   private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
   private IndexDeletionPolicy _indexDeletionPolicy;
-  private String cluster = BlurConstants.BLUR_CLUSTER;
   private DirectoryReferenceFileGC _gc;
   private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
   private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(500);
@@ -131,14 +131,14 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   public void init() throws KeeperException, InterruptedException, IOException {
-    BlurUtil.setupZookeeper(_zookeeper, cluster);
+    BlurUtil.setupZookeeper(_zookeeper, _cluster);
     _openerService = Executors.newThreadPool("shard-opener", _shardOpenerThreadCount);
     _closer = new BlurIndexCloser();
     _closer.init();
     _gc = new DirectoryReferenceFileGC();
     _gc.init();
     setupFlushCacheTimer();
-    String lockPath = BlurUtil.lockForSafeMode(_zookeeper, getNodeName(), cluster);
+    String lockPath = BlurUtil.lockForSafeMode(_zookeeper, getNodeName(), _cluster);
     try {
       registerMyself();
       setupSafeMode();
@@ -152,8 +152,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void watchForShardServerChanges() {
-    ZookeeperPathConstants.getOnlineShardsPath(cluster);
-    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(cluster)).watch(new
OnChange() {
+    ZookeeperPathConstants.getOnlineShardsPath(_cluster);
+    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(_cluster)).watch(new
OnChange() {
       private List<String> _prevOnlineShards = new ArrayList<String>();
 
       @Override
@@ -181,7 +181,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void waitInSafeModeIfNeeded() throws KeeperException, InterruptedException {
-    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(_cluster);
     Stat stat = _zookeeper.exists(blurSafemodePath, false);
     if (stat == null) {
       throw new RuntimeException("Safemode path missing [" + blurSafemodePath + "]");
@@ -199,7 +199,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void setupSafeMode() throws KeeperException, InterruptedException {
-    String shardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster);
+    String shardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster);
     List<String> children = _zookeeper.getChildren(shardsPath, false);
     if (children.size() == 0) {
       throw new RuntimeException("No shards registered!");
@@ -209,7 +209,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }
     LOG.info("First node online, setting up safe mode.");
     long timestamp = System.currentTimeMillis() + _safeModeDelay;
-    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(_cluster);
     Stat stat = _zookeeper.exists(blurSafemodePath, false);
     if (stat == null) {
       _zookeeper.create(blurSafemodePath, Long.toString(timestamp).getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -220,11 +220,11 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void setupAnyMissingPaths() throws KeeperException, InterruptedException {
-    String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
+    String tablesPath = ZookeeperPathConstants.getTablesPath(_cluster);
     List<String> tables = _zookeeper.getChildren(tablesPath, false);
     for (String table : tables) {
-      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getLockPath(cluster, table));
-      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getTableFieldNamesPath(cluster,
table));
+      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getLockPath(_cluster, table));
+      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getTableFieldNamesPath(_cluster,
table));
     }
   }
 
@@ -246,7 +246,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
       private void warmup() {
         if (_running.get()) {
-          List<String> tableList = _clusterStatus.getTableList(false, cluster);
+          List<String> tableList = _clusterStatus.getTableList(false, _cluster);
           _blurMetrics.tableCount.set(tableList.size());
           long indexCount = 0;
           AtomicLong segmentCount = new AtomicLong();
@@ -287,8 +287,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
   private void registerMyself() {
     String nodeName = getNodeName();
-    String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(cluster)
+ "/" + nodeName;
-    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster) + "/" +
nodeName;
+    String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(_cluster)
+ "/" + nodeName;
+    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster) + "/"
+ nodeName;
     try {
       if (_zookeeper.exists(registeredShardsPath, false) == null) {
         _zookeeper.create(registeredShardsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -372,12 +372,12 @@ public class DistributedIndexServer extends AbstractIndexServer {
     List<String> tables = new ArrayList<String>(map.keySet());
     Map<String, T> removed = new HashMap<String, T>();
     for (String table : tables) {
-      if (!_clusterStatus.exists(true, cluster, table)) {
+      if (!_clusterStatus.exists(true, _cluster, table)) {
         removed.put(table, map.remove(table));
       }
     }
     for (String table : tables) {
-      if (!_clusterStatus.isEnabled(true, cluster, table)) {
+      if (!_clusterStatus.isEnabled(true, _cluster, table)) {
         removed.put(table, map.remove(table));
       }
     }
@@ -481,7 +481,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
     Directory directory = new HdfsDirectory(hdfsDirPath);
     directory.setLockFactory(lockFactory);
 
-    TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+    TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
     String compressionClass = descriptor.compressionClass;
     int compressionBlockSize = descriptor.compressionBlockSize;
     if (compressionClass != null) {
@@ -495,16 +495,16 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }
 
     Directory dir;
-    boolean blockCacheEnabled = _clusterStatus.isBlockCacheEnabled(cluster, table);
+    boolean blockCacheEnabled = _clusterStatus.isBlockCacheEnabled(_cluster, table);
     if (blockCacheEnabled) {
-      Set<String> blockCacheFileTypes = _clusterStatus.getBlockCacheFileTypes(cluster,
table);
+      Set<String> blockCacheFileTypes = _clusterStatus.getBlockCacheFileTypes(_cluster,
table);
       dir = new BlockDirectory(table + "_" + shard, directory, _cache, blockCacheFileTypes);
     } else {
       dir = directory;
     }
 
     BlurIndex index;
-    if (_clusterStatus.isReadOnly(true, cluster, table)) {
+    if (_clusterStatus.isReadOnly(true, _cluster, table)) {
       BlurIndexReader reader = new BlurIndexReader();
       reader.setCloser(_closer);
       reader.setAnalyzer(getAnalyzer(table));
@@ -534,7 +534,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
       index = writer;
     }
     _filterCache.opening(table, shard, index);
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
     return warmUp(index, tableDescriptor, shard);
   }
 
@@ -712,7 +712,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
     checkTable(table);
     Similarity similarity = _tableSimilarity.get(table);
     if (similarity == null) {
-      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster,
table);
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster,
table);
       String similarityClass = tableDescriptor.similarityClass;
       if (similarityClass == null) {
         similarity = new FairSimilarity();
@@ -736,7 +736,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   @Override
   public TABLE_STATUS getTableStatus(String table) {
     checkTable(table);
-    boolean enabled = _clusterStatus.isEnabled(true, cluster, table);
+    boolean enabled = _clusterStatus.isEnabled(true, _cluster, table);
     if (enabled) {
       return TABLE_STATUS.ENABLED;
     }
@@ -744,7 +744,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void checkTable(String table) {
-    if (_clusterStatus.exists(true, cluster, table)) {
+    if (_clusterStatus.exists(true, _cluster, table)) {
       return;
     }
     throw new RuntimeException("Table [" + table + "] does not exist.");
@@ -760,7 +760,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private TableDescriptor getTableDescriptor(String table) {
     TableDescriptor tableDescriptor = _tableDescriptors.get(table);
     if (tableDescriptor == null) {
-      tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+      tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
       _tableDescriptors.put(table, tableDescriptor);
     }
     return tableDescriptor;
@@ -840,4 +840,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
   public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
     _timeBetweenRefreshs = timeBetweenRefreshs;
   }
+  
+  public void setClusterName(String cluster) {
+    _cluster = cluster;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f11647d/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index c403b8c..7545cc3 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -16,6 +16,10 @@ package org.apache.blur.thrift;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -68,6 +72,13 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public void init() {
     _queryCache = new QueryCache("shard-cache", _maxQueryCacheElements, _maxTimeToLive);
     _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
+    
+    if(_configuration != null) {
+      _cluster = _configuration.get(BlurConstants.BLUR_CLUSTER_NAME, BlurConstants.BLUR_CLUSTER);
+      _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
+      _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS,
128);
+      _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
     
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f11647d/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index e8a7331..dc3ecff 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -308,6 +308,10 @@ public abstract class TableAdmin implements Iface {
   public void setZookeeper(ZooKeeper zookeeper) {
     _zookeeper = zookeeper;
   }
+  
+  public void setConfiguration(BlurConfiguration config) {
+    _configuration = config;
+  }
 
   @Override
   public Map<String, String> configuration() throws BlurException, TException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f11647d/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 7a6ea44..55b6c60 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -17,6 +17,7 @@ package org.apache.blur.thrift;
  * limitations under the License.
  */
 import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
@@ -183,6 +184,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     indexServer.setBlurMetrics(blurMetrics);
     indexServer.setCache(cache);
     indexServer.setClusterStatus(clusterStatus);
+    indexServer.setClusterName(configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER));
     indexServer.setConfiguration(config);
     indexServer.setNodeName(nodeName);
     indexServer.setRefresher(refresher);
@@ -209,10 +211,8 @@ public class ThriftBlurShardServer extends ThriftServer {
     shardServer.setIndexManager(indexManager);
     shardServer.setZookeeper(zooKeeper);
     shardServer.setClusterStatus(clusterStatus);
-    shardServer.setDataFetchThreadCount(configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT,
8));
-    shardServer.setMaxQueryCacheElements(configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS,
128));
-    shardServer.setMaxTimeToLive(configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1)));
     shardServer.setQueryChecker(queryChecker);
+    shardServer.setConfiguration(configuration);
     shardServer.init();
 
     Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(blurMetrics, shardServer, Iface.class);


Mime
View raw message