incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Refactoring some code to cleanup the startup of IndexManager.
Date Sun, 27 Oct 2013 00:31:16 GMT
Updated Branches:
  refs/heads/master e75fab65f -> 609763bf7


Refactoring some code to cleanup the startup of IndexManager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/609763bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/609763bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/609763bf

Branch: refs/heads/master
Commit: 609763bf7958870ff62ab201344ff66c367b0c99
Parents: e75fab6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Oct 26 20:30:59 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Oct 26 20:31:16 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/manager/BlurFilterCache.java    |  32 +++++-
 .../blur/manager/DefaultBlurFilterCache.java    |   5 +
 .../org/apache/blur/manager/IndexManager.java   | 100 ++++++++-----------
 .../blur/thrift/ThriftBlurShardServer.java      |  23 +++--
 .../apache/blur/manager/IndexManagerTest.java   |  19 ++--
 5 files changed, 95 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/609763bf/blur-core/src/main/java/org/apache/blur/manager/BlurFilterCache.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/BlurFilterCache.java b/blur-core/src/main/java/org/apache/blur/manager/BlurFilterCache.java
index b200e6f..9c666b7 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/BlurFilterCache.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/BlurFilterCache.java
@@ -16,6 +16,7 @@ package org.apache.blur.manager;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.lucene.search.SuperParser;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.thrift.generated.Record;
@@ -29,6 +30,12 @@ import org.apache.lucene.search.Filter;
  */
 public abstract class BlurFilterCache {
 
+  protected final BlurConfiguration _configuration;
+
+  public BlurFilterCache(BlurConfiguration configuration) {
+    _configuration = configuration;
+  }
+
   /**
    * The fetchPreFilter method fetches the cache pre-filter (or {@link Record}
    * Filter) before attempting to execute the filter provided by the user.
@@ -68,8 +75,8 @@ public abstract class BlurFilterCache {
 
   /**
    * The storePreFilter method stores the parsed post {@link Filter} (or
-   * {@link Row} Filter) for caching, and should return the {@link Filter} to
-   * be executed.
+   * {@link Row} Filter) for caching, and should return the {@link Filter} to be
+   * executed.
    * 
    * @param table
    *          the table name.
@@ -79,9 +86,28 @@ public abstract class BlurFilterCache {
    */
   public abstract Filter storePostFilter(String table, String filterStr, Filter filter);
 
-
+  /**
+   * Notifies the cache that the index is closing on this shard server.
+   * 
+   * @param table
+   *          the table name.
+   * @param shard
+   *          the shard name.
+   * @param index
+   *          the {@link BlurIndex}.
+   */
   public abstract void closing(String table, String shard, BlurIndex index);
 
+  /**
+   * Notifies the cache that the index is opening on this shard server.
+   * 
+   * @param table
+   *          the table name.
+   * @param shard
+   *          the shard name.
+   * @param index
+   *          the {@link BlurIndex}.
+   */
   public abstract void opening(String table, String shard, BlurIndex index);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/609763bf/blur-core/src/main/java/org/apache/blur/manager/DefaultBlurFilterCache.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/DefaultBlurFilterCache.java b/blur-core/src/main/java/org/apache/blur/manager/DefaultBlurFilterCache.java
index b0a6122..3a9c0b9 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/DefaultBlurFilterCache.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/DefaultBlurFilterCache.java
@@ -16,6 +16,7 @@ package org.apache.blur.manager;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.lucene.search.Filter;
 
@@ -25,6 +26,10 @@ import org.apache.lucene.search.Filter;
  */
 public class DefaultBlurFilterCache extends BlurFilterCache {
 
+  public DefaultBlurFilterCache(BlurConfiguration configuration) {
+    super(configuration);
+  }
+
   @Override
   public Filter storePreFilter(String table, String filterStr, Filter filter) {
     return filter;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/609763bf/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index aa4dfe6..739b92d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -125,34 +125,38 @@ public class IndexManager {
   private static final String NOT_FOUND = "NOT_FOUND";
   private static final Log LOG = LogFactory.getLog(IndexManager.class);
 
-  private IndexServer _indexServer;
-  private ClusterStatus _clusterStatus;
-  private ExecutorService _executor;
-  private ExecutorService _mutateExecutor;
-  private int _threadCount;
-  private QueryStatusManager _statusManager = new QueryStatusManager();
-  private boolean _closed;
-  private BlurPartitioner _blurPartitioner = new BlurPartitioner();
-  private BlurFilterCache _filterCache = new DefaultBlurFilterCache();
-  private long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);
-  private Meter _readRecordsMeter;
-  private Meter _readRowMeter;
-  private Meter _writeRecordsMeter;
-  private Meter _writeRowMeter;
-  private Meter _queriesExternalMeter;
-  private Meter _queriesInternalMeter;
-  private Timer _fetchTimer;
-  private int _fetchCount = 100;
-  private int _maxHeapPerRowFetch = 10000000;
-  private int _mutateThreadCount;
+  private final Meter _readRecordsMeter;
+  private final Meter _readRowMeter;
+  private final Meter _writeRecordsMeter;
+  private final Meter _writeRowMeter;
+  private final Meter _queriesExternalMeter;
+  private final Meter _queriesInternalMeter;
+
+  private final IndexServer _indexServer;
+  private final ClusterStatus _clusterStatus;
+  private final ExecutorService _executor;
+  private final ExecutorService _mutateExecutor;
+
+  private final QueryStatusManager _statusManager = new QueryStatusManager();
+  private final AtomicBoolean _closed = new AtomicBoolean(false);
+  private final BlurPartitioner _blurPartitioner = new BlurPartitioner();
+  private final BlurFilterCache _filterCache;
+  private final long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);
+
+  private final Timer _fetchTimer;
+  private final int _fetchCount;
+  private final int _maxHeapPerRowFetch;
+
+  private final int _threadCount;
+  private final int _mutateThreadCount;
 
   public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
 
-  public void setMaxClauseCount(int maxClauseCount) {
-    BooleanQuery.setMaxClauseCount(maxClauseCount);
-  }
-
-  public void init() {
+  public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache
filterCache,
+      int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay) {
+    _indexServer = indexServer;
+    _clusterStatus = clusterStatus;
+    _filterCache = filterCache;
     _readRecordsMeter = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, BLUR, "Read Records/s"),
"Records/s",
         TimeUnit.SECONDS);
     _readRowMeter = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, BLUR, "Read Row/s"),
"Row/s", TimeUnit.SECONDS);
@@ -166,22 +170,28 @@ public class IndexManager {
         "Internal Queries/s", TimeUnit.SECONDS);
     _fetchTimer = Metrics.newTimer(new MetricName(ORG_APACHE_BLUR, BLUR, "Fetch Timer"),
TimeUnit.MICROSECONDS,
         TimeUnit.SECONDS);
-    if (_threadCount == 0) {
+    if (threadCount == 0) {
       throw new RuntimeException("Thread Count cannot be 0.");
     }
-    if (_mutateThreadCount == 0) {
+    _threadCount = threadCount;
+    if (mutateThreadCount == 0) {
       throw new RuntimeException("Mutate Thread Count cannot be 0.");
     }
+    _mutateThreadCount = mutateThreadCount;
+    _fetchCount = fetchCount;
+    _maxHeapPerRowFetch = maxHeapPerRowFetch;
+
     _executor = Executors.newThreadPool("index-manager", _threadCount);
     _mutateExecutor = Executors.newThreadPool("index-manager-mutate", _mutateThreadCount);
+    _statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
     _statusManager.init();
     LOG.info("Init Complete");
 
   }
 
   public synchronized void close() {
-    if (!_closed) {
-      _closed = true;
+    if (!_closed.get()) {
+      _closed.set(true);
       _statusManager.close();
       _executor.shutdownNow();
       _mutateExecutor.shutdownNow();
@@ -643,10 +653,6 @@ public class IndexManager {
     return _indexServer;
   }
 
-  public void setIndexServer(IndexServer indexServer) {
-    this._indexServer = indexServer;
-  }
-
   public long recordFrequency(final String table, final String columnFamily, final String
columnName, final String value)
       throws Exception {
     Map<String, BlurIndex> blurIndexes;
@@ -800,10 +806,6 @@ public class IndexManager {
     return columnDefinition;
   }
 
-  public void setStatusCleanupTimerDelay(long delay) {
-    _statusManager.setStatusCleanupTimerDelay(delay);
-  }
-
   public void mutate(final RowMutation mutation) throws BlurException, IOException {
     long s = System.nanoTime();
     doMutate(mutation);
@@ -1135,18 +1137,6 @@ public class IndexManager {
     }
   }
 
-  public void setThreadCount(int threadCount) {
-    _threadCount = threadCount;
-  }
-
-  public void setMutateThreadCount(int mutateThreadCount) {
-    _mutateThreadCount = mutateThreadCount;
-  }
-
-  public void setFilterCache(BlurFilterCache filterCache) {
-    _filterCache = filterCache;
-  }
-
   public void optimize(String table, int numberOfSegmentsPerShard) throws BException {
     Map<String, BlurIndex> blurIndexes;
     try {
@@ -1167,16 +1157,4 @@ public class IndexManager {
     }
   }
 
-  public void setClusterStatus(ClusterStatus clusterStatus) {
-    _clusterStatus = clusterStatus;
-  }
-
-  public void setFetchCount(int fetchCount) {
-    _fetchCount = fetchCount;
-  }
-
-  public void setMaxHeapPerRowFetch(int maxHeapPerRowFetch) {
-    _maxHeapPerRowFetch = maxHeapPerRowFetch;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/609763bf/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index ffed891..5837fc4 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -87,6 +87,7 @@ import org.apache.blur.utils.GCWatcher;
 import org.apache.blur.utils.MemoryReporter;
 import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.search.BooleanQuery;
 import org.apache.zookeeper.ZooKeeper;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.webapp.WebAppContext;
@@ -196,16 +197,16 @@ public class ThriftBlurShardServer extends ThriftServer {
         indexWarmup, filterCache, blockCacheDirectoryFactory, distributedLayoutFactory, cluster,
nodeName,
         safeModeDelay, shardOpenerThreadCount, internalSearchThreads, warmupThreads, maxMergeThreads);
 
-    final IndexManager indexManager = new IndexManager();
-    indexManager.setIndexServer(indexServer);
-    indexManager.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
-    indexManager.setThreadCount(configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT,
32));
-    indexManager.setMutateThreadCount(configuration.getInt(BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT,
32));
-    indexManager.setFilterCache(filterCache);
-    indexManager.setClusterStatus(clusterStatus);
-    indexManager.setFetchCount(configuration.getInt(BLUR_SHARD_FETCHCOUNT, 100));
-    indexManager.setMaxHeapPerRowFetch(configuration.getInt(BLUR_MAX_HEAP_PER_ROW_FETCH,
10000000));
-    indexManager.init();
+    BooleanQuery.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
+
+    int maxHeapPerRowFetch = configuration.getInt(BLUR_MAX_HEAP_PER_ROW_FETCH, 10000000);
+    int fetchCount = configuration.getInt(BLUR_SHARD_FETCHCOUNT, 100);
+    int indexManagerThreadCount = configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT,
32);
+    int mutateThreadCount = configuration.getInt(BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT, 32);
+    long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
+    
+    final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache,
maxHeapPerRowFetch,
+        fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay);
 
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setIndexServer(indexServer);
@@ -267,7 +268,7 @@ public class ThriftBlurShardServer extends ThriftServer {
         throw new RuntimeException(e);
       }
     }
-    return new DefaultBlurFilterCache();
+    return new DefaultBlurFilterCache(configuration);
   }
 
   private static BlurIndexWarmup getIndexWarmup(BlurConfiguration configuration) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/609763bf/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 1573363..fd4e287 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLongArray;
 
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.indexserver.LocalIndexServer;
 import org.apache.blur.manager.results.BlurResultIterable;
@@ -103,12 +104,14 @@ public class IndexManagerTest {
     tableDescriptor.setShardCount(1);
     server = new LocalIndexServer(tableDescriptor);
 
-    indexManager = new IndexManager();
-    indexManager.setStatusCleanupTimerDelay(1000);
-    indexManager.setIndexServer(server);
-    indexManager.setThreadCount(1);
-    indexManager.setMutateThreadCount(1);
-    indexManager.setClusterStatus(new ClusterStatus() {
+    BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
+    long statusCleanupTimerDelay = 1000;
+    indexManager = new IndexManager(server,getClusterStatus(tableDescriptor),filterCache,10000000,100,1,1,statusCleanupTimerDelay);
+    setupData();
+  }
+
+  private ClusterStatus getClusterStatus(final TableDescriptor tableDescriptor) {
+    return new ClusterStatus() {
 
       @Override
       public void removeTable(String cluster, String table, boolean deleteIndexFiles) {
@@ -209,9 +212,7 @@ public class IndexManagerTest {
       public void createTable(TableDescriptor tableDescriptor) {
         throw new RuntimeException("Not impl");
       }
-    });
-    indexManager.init();
-    setupData();
+    };
   }
 
   @After


Mime
View raw message