incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [8/9] Tons of changes to make shell work. This has uncovered some limitations in the queryargs object. Refactored to allow for different query types.
Date Mon, 31 Dec 2012 20:44:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index cf78777..8220d40 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -20,29 +20,26 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Analyzer;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.WatchChildren;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
 import org.apache.blur.zookeeper.WatchNodeData;
 import org.apache.blur.zookeeper.WatchNodeExistance;
-import org.apache.blur.zookeeper.ZkUtils;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
@@ -55,24 +52,17 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   private ConcurrentMap<String, Long> _safeModeMap = new ConcurrentHashMap<String, Long>();
   private ConcurrentMap<String, List<String>> _onlineShardsNodes = new ConcurrentHashMap<String, List<String>>();
   private ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String, Set<String>>();
-  private AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new HashSet<String>());
-  private ConcurrentMap<String, Boolean> _enabled = new ConcurrentHashMap<String, Boolean>();
-  private ConcurrentMap<String, Boolean> _readOnly = new ConcurrentHashMap<String, Boolean>();
 
-  private WatchChildren _clusterWatcher;
   private ConcurrentMap<String, WatchChildren> _onlineShardsNodesWatchers = new ConcurrentHashMap<String, WatchChildren>();
   private ConcurrentMap<String, WatchChildren> _tableWatchers = new ConcurrentHashMap<String, WatchChildren>();
   private ConcurrentMap<String, WatchNodeExistance> _safeModeWatchers = new ConcurrentHashMap<String, WatchNodeExistance>();
   private ConcurrentMap<String, WatchNodeData> _safeModeDataWatchers = new ConcurrentHashMap<String, WatchNodeData>();
-  private ConcurrentMap<String, WatchNodeExistance> _enabledWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
-  private ConcurrentMap<String, WatchNodeExistance> _readOnlyWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
   private String _cluster;
 
   public ZookeeperClusterStatus(String cluster, ZooKeeper zooKeeper) {
     _zk = zooKeeper;
     _running.set(true);
     _cluster = cluster;
-    watchForClusters();
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
@@ -80,36 +70,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }
   }
 
-  class Clusters extends OnChange {
-    @Override
-    public void action(List<String> clusters) {
-      _clusters.set(new HashSet<String>(clusters));
-      for (String cluster : clusters) {
-        if (!_tableWatchers.containsKey(cluster)) {
-          String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
-          ZkUtils.waitUntilExists(_zk, tablesPath);
-          WatchChildren clusterWatcher = new WatchChildren(_zk, tablesPath).watch(new Tables(cluster));
-          _tableWatchers.put(cluster, clusterWatcher);
-          String safemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
-          ZkUtils.waitUntilExists(_zk, safemodePath);
-          WatchNodeExistance watchNodeExistance = new WatchNodeExistance(_zk, safemodePath).watch(new SafeExistance(cluster));
-          _safeModeWatchers.put(cluster, watchNodeExistance);
-        }
-      }
-
-      List<String> clustersToCloseAndRemove = new ArrayList<String>(clusters);
-      clustersToCloseAndRemove.removeAll(_tableWatchers.keySet());
-      for (String cluster : clustersToCloseAndRemove) {
-        WatchChildren watcher = _tableWatchers.remove(cluster);
-        if (watcher == null) {
-          LOG.error("Error watcher is null [" + cluster + "] ");
-        } else {
-          watcher.close();
-        }
-      }
-    }
-  }
-
   class SafeExistance extends WatchNodeExistance.OnChange {
 
     private String cluster;
@@ -143,66 +103,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }
   }
 
-  class Tables extends OnChange {
-    private String cluster;
-
-    public Tables(String cluster) {
-      this.cluster = cluster;
-    }
-
-    @Override
-    public void action(List<String> tables) {
-      Set<String> newSet = new HashSet<String>(tables);
-      Set<String> oldSet = _tablesPerCluster.put(cluster, newSet);
-      Set<String> newTables = getNewTables(newSet, oldSet);
-      for (String table : newTables) {
-        final String clusterTableKey = getClusterTableKey(cluster, table);
-
-        WatchNodeExistance readOnlyWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableReadOnlyPath(cluster, table));
-        readOnlyWatcher.watch(new WatchNodeExistance.OnChange() {
-          @Override
-          public void action(Stat stat) {
-            if (stat == null) {
-              _readOnly.put(clusterTableKey, Boolean.FALSE);
-            } else {
-              _readOnly.put(clusterTableKey, Boolean.TRUE);
-            }
-          }
-        });
-        if (_readOnlyWatchNodeExistance.putIfAbsent(clusterTableKey, readOnlyWatcher) != null) {
-          readOnlyWatcher.close();
-        }
-
-        WatchNodeExistance enabledWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableEnabledPath(cluster, table));
-        enabledWatcher.watch(new WatchNodeExistance.OnChange() {
-          @Override
-          public void action(Stat stat) {
-            if (stat == null) {
-              _enabled.put(clusterTableKey, Boolean.FALSE);
-            } else {
-              _enabled.put(clusterTableKey, Boolean.TRUE);
-            }
-          }
-        });
-        if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null) {
-          enabledWatcher.close();
-        }
-      }
-    }
-
-    private Set<String> getNewTables(Set<String> newSet, Set<String> oldSet) {
-      Set<String> newTables = new HashSet<String>(newSet);
-      if (oldSet != null) {
-        newTables.removeAll(oldSet);
-      }
-      return newTables;
-    }
-  }
-
-  private void watchForClusters() {
-    _clusterWatcher = new WatchChildren(_zk, ZookeeperPathConstants.getClustersPath()).watch(new Clusters());
-  }
-
   public ZookeeperClusterStatus(String cluster, String connectionStr) throws IOException {
     this(cluster, new ZooKeeper(connectionStr, 30000, new Watcher() {
       @Override
@@ -212,10 +112,6 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }));
   }
 
-  private String getClusterTableKey(String cluster, String table) {
-    return cluster + "." + table;
-  }
-
   private void checkIfOpen() {
     if (_running.get()) {
       return;
@@ -367,13 +263,10 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   public void close() {
     if (_running.get()) {
       _running.set(false);
-      close(_clusterWatcher);
       close(_onlineShardsNodesWatchers);
       close(_tableWatchers);
       close(_safeModeWatchers);
       close(_safeModeDataWatchers);
-      close(_enabledWatchNodeExistance);
-      close(_readOnlyWatchNodeExistance);
     }
   }
 
@@ -436,13 +329,19 @@ public class ZookeeperClusterStatus extends ClusterStatus {
       String table = BlurUtil.nullCheck(tableDescriptor.getName(), "Name cannot be null.");
       String uri = BlurUtil.nullCheck(tableDescriptor.getStoragePath(), "Storage path cannot be null.");
       int shardCount = BlurUtil.zeroCheck(tableDescriptor.shardCount, "ShardCount cannot be less than 1");
-      String blurTablePath = ZookeeperPathConstants.getTablePath(getClusterName(), table);
+      String tablePath = ZookeeperPathConstants.getTablePath(getClusterName(), table);
+
+      Analyzer analyzer = tableDescriptor.getAnalyzer();
+      if (analyzer != null) {
+        // check the analyzer to be valid
+        new BlurAnalyzer(analyzer);
+      }
 
-      if (_zk.exists(blurTablePath, false) != null) {
+      if (_zk.exists(tablePath, false) != null) {
         throw new IOException("Table [" + table + "] already exists.");
       }
       BlurUtil.setupFileSystem(uri, shardCount);
-      BlurUtil.createPath(_zk, blurTablePath, BlurUtil.read(tableDescriptor));
+      BlurUtil.createPath(_zk, tablePath, BlurUtil.read(tableDescriptor));
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (KeeperException e) {
@@ -456,20 +355,20 @@ public class ZookeeperClusterStatus extends ClusterStatus {
   }
 
   @Override
-  public void disableTable(String table) {
+  public void disableTable(String table) throws IOException {
     long s = System.nanoTime();
     try {
       checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(_cluster, table), false) == null) {
+      String tablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
+      if (_zk.exists(tablePath, false) == null) {
         throw new IOException("Table [" + table + "] does not exist.");
       }
-      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(_cluster, table);
-      if (_zk.exists(blurTableEnabledPath, false) == null) {
-        throw new IOException("Table [" + table + "] already disabled.");
+      TableDescriptor tableDescriptor = getTableDescriptor(false, table);
+      if (!tableDescriptor.isEnabled()) {
+        throw new IOException("Table [" + table + "] is already disabled.");
       }
-      _zk.delete(blurTableEnabledPath, -1);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+      tableDescriptor.setEnabled(false);
+      _zk.setData(tablePath, BlurUtil.read(tableDescriptor), -1);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } catch (KeeperException e) {
@@ -485,14 +384,16 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     long s = System.nanoTime();
     try {
       checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(_cluster, table), false) == null) {
+      String tablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
+      if (_zk.exists(tablePath, false) == null) {
         throw new IOException("Table [" + table + "] does not exist.");
       }
-      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(_cluster, table);
-      if (_zk.exists(blurTableEnabledPath, false) != null) {
-        throw new IOException("Table [" + table + "] already enabled.");
+      TableDescriptor tableDescriptor = getTableDescriptor(false, table);
+      if (tableDescriptor.isEnabled()) {
+        LOG.info("Table [" + table + "] is already enabled.");
       }
-      _zk.create(blurTableEnabledPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      tableDescriptor.setEnabled(true);
+      _zk.setData(tablePath, BlurUtil.read(tableDescriptor), -1);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {
@@ -510,19 +411,15 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     long s = System.nanoTime();
     try {
       checkIfOpen();
-      String blurTablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
-      if (_zk.exists(blurTablePath, false) == null) {
+      String tablePath = ZookeeperPathConstants.getTablePath(_cluster, table);
+      if (_zk.exists(tablePath, false) == null) {
         throw new IOException("Table [" + table + "] does not exist.");
       }
-      if (_zk.exists(ZookeeperPathConstants.getTableEnabledPath(_cluster, table), false) != null) {
-        throw new IOException("Table [" + table + "] must be disabled before it can be removed.");
-      }
-      byte[] data = getData(ZookeeperPathConstants.getTableUriPath(_cluster, table));
-      String uri = new String(data);
-      BlurUtil.removeAll(_zk, blurTablePath);
-      if (deleteIndexFiles) {
-        BlurUtil.removeIndexFiles(uri);
+      TableDescriptor tableDescriptor = getTableDescriptor(false, table);
+      if (tableDescriptor.isEnabled()) {
+        throw new IOException("Table [" + table + "] is NOT disabled.");
       }
+      _zk.delete(tablePath, -1);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
index cb9eeed..7032f74 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
@@ -35,17 +35,17 @@ public class ZookeeperPathConstants {
     return getBasePath() + "/online-controller-nodes";
   }
 
-  public static String getTableEnabledPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/enabled";
-  }
+//  public static String getTableEnabledPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/enabled";
+//  }
 
-  public static String getTableUriPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/uri";
-  }
+//  public static String getTableUriPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/uri";
+//  }
 
-  public static String getTableShardCountPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/shard-count";
-  }
+//  public static String getTableShardCountPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/shard-count";
+//  }
 
   public static String getOnlinePath(String cluster) {
     return getClusterPath(cluster) + "/online";
@@ -71,44 +71,44 @@ public class ZookeeperPathConstants {
     return getClusterPath(cluster) + "/shard-nodes";
   }
 
-  public static String getTableCompressionCodecPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/compression-codec";
-  }
+//  public static String getTableCompressionCodecPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/compression-codec";
+//  }
 
-  public static String getTableCompressionBlockSizePath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/compression-blocksize";
-  }
+//  public static String getTableCompressionBlockSizePath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/compression-blocksize";
+//  }
 
-  public static String getLockPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/locks";
-  }
+//  public static String getLockPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/locks";
+//  }
 
-  public static String getTableBlockCachingFileTypesPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/blockcachingfiletypes";
-  }
+//  public static String getTableBlockCachingFileTypesPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/blockcachingfiletypes";
+//  }
 
-  public static String getTableBlockCachingPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/blockcaching";
-  }
+//  public static String getTableBlockCachingPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/blockcaching";
+//  }
 
-  public static String getTableSimilarityPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/similarity";
-  }
+//  public static String getTableSimilarityPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/similarity";
+//  }
 
-  public static String getTableFieldNamesPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/fieldnames";
-  }
+//  public static String getTableFieldNamesPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/fieldnames";
+//  }
 
-  public static String getTableFieldNamesPath(String cluster, String table, String fieldName) {
-    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
-  }
+//  public static String getTableFieldNamesPath(String cluster, String table, String fieldName) {
+//    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
+//  }
 
-  public static String getTableReadOnlyPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/readonly";
-  }
+//  public static String getTableReadOnlyPath(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/readonly";
+//  }
 
-  public static String getTableColumnsToPreCache(String cluster, String table) {
-    return getTablePath(cluster, table) + "/precache";
-  }
+//  public static String getTableColumnsToPreCache(String cluster, String table) {
+//    return getTablePath(cluster, table) + "/precache";
+//  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index c59e93a..033a957 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -55,7 +55,8 @@ import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.Cache;
 import org.apache.blur.store.hdfs.BlurLockFactory;
-import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.ShardContext;
+import org.apache.blur.thrift.TableContext;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -66,7 +67,6 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
@@ -76,10 +76,8 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
-
 public class DistributedIndexServer extends AbstractIndexServer {
 
-  private static final String LOGS = "logs";
   private static final Log LOG = LogFactory.getLog(DistributedIndexServer.class);
   private static final long _delay = TimeUnit.SECONDS.toMillis(5);
 
@@ -103,17 +101,15 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
   // set internally
   private Timer _timerCacheFlush;
+  private Timer _timerTableWarmer;
+  
   private ExecutorService _openerService;
   private BlurIndexCloser _closer;
-  private Timer _timerTableWarmer;
   private BlurFilterCache _filterCache;
   private AtomicBoolean _running = new AtomicBoolean();
   private long _safeModeDelay;
   private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
-  private IndexDeletionPolicy _indexDeletionPolicy;
   private DirectoryReferenceFileGC _gc;
-  private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
-  private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(500);
   private WatchChildren _watchOnlineShards;
 
   public static interface ReleaseReader {
@@ -206,16 +202,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
     } else {
       _zookeeper.setData(blurSafemodePath, Long.toString(timestamp).getBytes(), -1);
     }
-    setupAnyMissingPaths();
-  }
-
-  private void setupAnyMissingPaths() throws KeeperException, InterruptedException {
-    String tablesPath = ZookeeperPathConstants.getTablesPath(_cluster);
-    List<String> tables = _zookeeper.getChildren(tablesPath, false);
-    for (String table : tables) {
-      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getLockPath(_cluster, table));
-      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getTableFieldNamesPath(_cluster, table));
-    }
   }
 
   private void setupTableWarmer() {
@@ -260,18 +246,18 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
       private void updateMetrics(BlurMetrics blurMetrics, Map<String, BlurIndex> indexes, AtomicLong segmentCount, AtomicLong indexMemoryUsage) throws IOException {
         // @TODO not sure how to do this yet
-//        for (BlurIndex index : indexes.values()) {
-//          IndexReader reader = index.getIndexReader();
-//          try {
-//            IndexReader[] readers = reader.getSequentialSubReaders();
-//            if (readers != null) {
-//              segmentCount.addAndGet(readers.length);
-//            }
-//            indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
-//          } finally {
-//            reader.decRef();
-//          }
-//        }
+        // for (BlurIndex index : indexes.values()) {
+        // IndexReader reader = index.getIndexReader();
+        // try {
+        // IndexReader[] readers = reader.getSequentialSubReaders();
+        // if (readers != null) {
+        // segmentCount.addAndGet(readers.length);
+        // }
+        // indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
+        // } finally {
+        // reader.decRef();
+        // }
+        // }
       }
     }, _delay, _delay);
   }
@@ -377,6 +363,12 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   @Override
+  public TableContext getTableContext(String table) throws IOException {
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(false, table);
+    return TableContext.create(tableDescriptor);
+  }
+
+  @Override
   public void close() {
     if (_running.get()) {
       _running.set(false);
@@ -448,42 +440,30 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }
   }
 
-  private BlurIndex openShard(String table, String shard) throws IOException {
+  private BlurIndex openShard(TableContext tableContext, String table, String shard) throws IOException {
     LOG.info("Opening shard [{0}] for table [{1}]", shard, table);
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
-    Path tablePath = new Path(tableDescriptor.getStoragePath());
-    Path walTablePath = new Path(tablePath, LOGS);
-    Path hdfsDirPath = new Path(tablePath, shard);
+    ShardContext shardContext = ShardContext.create(tableContext, shard);
+    TableDescriptor tableDescriptor = tableContext.getDescriptor();
+    Path hdfsDirPath = shardContext.getHdfsDirPath();
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName, BlurConstants.getPid());
-
-    Directory directory = new HdfsDirectory(_configuration, hdfsDirPath);
+    Directory directory = shardContext.getDirectory();
     directory.setLockFactory(lockFactory);
     Directory dir = new BlockDirectory(table + "_" + shard, directory, _cache);
 
     BlurIndex index;
     if (tableDescriptor.isReadOnly()) {
       BlurIndexReader reader = new BlurIndexReader();
-      reader.setCloser(_closer);
-      reader.setAnalyzer(getAnalyzer(table));
+      reader.setContext(shardContext);
       reader.setDirectory(dir);
+      reader.setCloser(_closer);
       reader.setRefresher(_refresher);
-      reader.setShard(shard);
-      reader.setTable(table);
-      reader.setIndexDeletionPolicy(_indexDeletionPolicy);
       reader.init();
       index = reader;
     } else {
       BlurNRTIndex writer = new BlurNRTIndex();
-      writer.setAnalyzer(getAnalyzer(table));
+      writer.setContext(shardContext);
       writer.setDirectory(dir);
-      writer.setShard(shard);
-      writer.setTable(table);
-      writer.setTimeBetweenCommits(_timeBetweenCommits);
-      writer.setTimeBetweenRefreshs(_timeBetweenRefreshs);
-      writer.setWalPath(walTablePath);
-      writer.setConfiguration(_configuration);
-      writer.setIndexDeletionPolicy(_indexDeletionPolicy);
       writer.setCloser(_closer);
       writer.setGc(_gc);
       writer.init();
@@ -509,40 +489,45 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
   private void warmUpAllSegments(IndexReader reader) throws IOException {
     LOG.warn("Warm up NOT supported yet.");
-    //Once the reader warm-up has been re-implemented, this code will change accordingly.
-    
-//    IndexReader[] indexReaders = reader.getSequentialSubReaders();
-//    if (indexReaders != null) {
-//      for (IndexReader r : indexReaders) {
-//        warmUpAllSegments(r);
-//      }
-//    }
-//    int maxDoc = reader.maxDoc();
-//    int numDocs = reader.numDocs();
-//    FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-//    Collection<String> fieldNames = new ArrayList<String>();
-//    for (FieldInfo fieldInfo : fieldInfos) {
-//      if (fieldInfo.isIndexed) {
-//        fieldNames.add(fieldInfo.name);
-//      }
-//    }
-//    int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
-//    TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
-//    termDocs.next();
-//    termDocs.close();
-//
-//    TermPositions termPositions = reader.termPositions(BlurConstants.PRIME_DOC_TERM);
-//    if (termPositions.next()) {
-//      if (termPositions.freq() > 0) {
-//        termPositions.nextPosition();
-//      }
-//    }
-//    termPositions.close();
-//    LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" + maxDoc + "], numDocs [" + numDocs + "], primeDocumentCount [" + primeDocCount + "], fieldCount ["
-//        + fieldNames.size() + "]");
-  }
-
-  private synchronized Map<String, BlurIndex> openMissingShards(final String table, Set<String> shardsToServe, final Map<String, BlurIndex> tableIndexes) {
+    // Once the reader warm-up has been re-implemented, this code will change
+    // accordingly.
+
+    // IndexReader[] indexReaders = reader.getSequentialSubReaders();
+    // if (indexReaders != null) {
+    // for (IndexReader r : indexReaders) {
+    // warmUpAllSegments(r);
+    // }
+    // }
+    // int maxDoc = reader.maxDoc();
+    // int numDocs = reader.numDocs();
+    // FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
+    // Collection<String> fieldNames = new ArrayList<String>();
+    // for (FieldInfo fieldInfo : fieldInfos) {
+    // if (fieldInfo.isIndexed) {
+    // fieldNames.add(fieldInfo.name);
+    // }
+    // }
+    // int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
+    // TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
+    // termDocs.next();
+    // termDocs.close();
+    //
+    // TermPositions termPositions =
+    // reader.termPositions(BlurConstants.PRIME_DOC_TERM);
+    // if (termPositions.next()) {
+    // if (termPositions.freq() > 0) {
+    // termPositions.nextPosition();
+    // }
+    // }
+    // termPositions.close();
+    // LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" +
+    // maxDoc + "], numDocs [" + numDocs + "], primeDocumentCount [" +
+    // primeDocCount + "], fieldCount ["
+    // + fieldNames.size() + "]");
+  }
+
+  private synchronized Map<String, BlurIndex> openMissingShards(final String table, Set<String> shardsToServe, final Map<String, BlurIndex> tableIndexes) throws IOException {
+    final TableContext tableContext = getTableContext(table);
     Map<String, Future<BlurIndex>> opening = new HashMap<String, Future<BlurIndex>>();
     for (String s : shardsToServe) {
       final String shard = s;
@@ -552,7 +537,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
         Future<BlurIndex> submit = _openerService.submit(new Callable<BlurIndex>() {
           @Override
           public BlurIndex call() throws Exception {
-            return openShard(table, shard);
+            return openShard(tableContext, table, shard);
           }
         });
         opening.put(shard, submit);
@@ -746,19 +731,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _warmup = warmup;
   }
 
-  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
-    _indexDeletionPolicy = indexDeletionPolicy;
-  }
-
-  public void setTimeBetweenCommits(long timeBetweenCommits) {
-    _timeBetweenCommits = timeBetweenCommits;
-  }
-
-  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
-    _timeBetweenRefreshs = timeBetweenRefreshs;
-  }
-  
   public void setClusterName(String cluster) {
     _cluster = cluster;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
deleted file mode 100644
index fd8335d..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package org.apache.blur.manager.indexserver;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.manager.writer.BlurIndexCloser;
-import org.apache.blur.manager.writer.BlurNRTIndex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.analysis.util.CharArraySet;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MMapDirectory;
-
-public class LocalIndexServer extends AbstractIndexServer {
-
-  private final static Log LOG = LogFactory.getLog(LocalIndexServer.class);
-
-  private Map<String, Map<String, BlurIndex>> _readersMap = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
-  private File _localDir;
-  private BlurIndexCloser _closer;
-  private Path _walPath;
-  private Configuration _configuration = new Configuration();
-
-  public LocalIndexServer(File file, Path walPath) {
-    _localDir = file;
-    _localDir.mkdirs();
-    _closer = new BlurIndexCloser();
-    _closer.init();
-    _walPath = walPath;
-  }
-
-  @Override
-  public BlurAnalyzer getAnalyzer(String table) {
-    return new BlurAnalyzer(new StandardAnalyzer(LUCENE_VERSION, new CharArraySet(LUCENE_VERSION, new HashSet<String>(), false)));
-  }
-
-  @Override
-  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
-    Map<String, BlurIndex> tableMap = _readersMap.get(table);
-    Set<String> shardsSet;
-    if (tableMap == null) {
-      shardsSet = getIndexes(table).keySet();
-    } else {
-      shardsSet = tableMap.keySet();
-    }
-    return new TreeSet<String>(shardsSet);
-  }
-
-  @Override
-  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
-    Map<String, BlurIndex> tableMap = _readersMap.get(table);
-    if (tableMap == null) {
-      tableMap = openFromDisk(table);
-      _readersMap.put(table, tableMap);
-    }
-    return tableMap;
-  }
-
-  @Override
-  public void close() {
-    _closer.close();
-    for (String table : _readersMap.keySet()) {
-      close(_readersMap.get(table));
-    }
-  }
-
-  private void close(Map<String, BlurIndex> map) {
-    for (BlurIndex index : map.values()) {
-      try {
-        index.close();
-      } catch (Exception e) {
-        LOG.error("Error while trying to close index.", e);
-      }
-    }
-  }
-
-  private Map<String, BlurIndex> openFromDisk(String table) throws IOException {
-    File tableFile = new File(_localDir, table);
-    if (tableFile.isDirectory()) {
-      Map<String, BlurIndex> shards = new ConcurrentHashMap<String, BlurIndex>();
-      for (File f : tableFile.listFiles()) {
-        if (f.isDirectory()) {
-          MMapDirectory directory = new MMapDirectory(f);
-          if (!DirectoryReader.indexExists(directory)) {
-            new IndexWriter(directory, new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer())).close();
-          }
-          String shardName = f.getName();
-          shards.put(shardName, openIndex(table, shardName, directory));
-        }
-      }
-      return shards;
-    }
-    throw new IOException("Table [" + table + "] not found.");
-  }
-
-  private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException, IOException {
-    BlurNRTIndex index = new BlurNRTIndex();
-    index.setAnalyzer(getAnalyzer(table));
-    index.setDirectory(dir);
-    index.setShard(shard);
-    index.setTable(table);
-    index.setWalPath(new Path(new Path(_walPath, table), shard));
-    index.setConfiguration(_configuration);
-    index.setCloser(_closer);
-    index.setTimeBetweenRefreshs(25);
-    index.init();
-    return index;
-  }
-
-  @Override
-  public TABLE_STATUS getTableStatus(String table) {
-    return TABLE_STATUS.ENABLED;
-  }
-
-  @Override
-  public List<String> getShardList(String table) {
-    try {
-      List<String> result = new ArrayList<String>();
-      File tableFile = new File(_localDir, table);
-      if (tableFile.isDirectory()) {
-        for (File f : tableFile.listFiles()) {
-          if (f.isDirectory()) {
-            result.add(f.getName());
-          }
-        }
-      }
-      return result;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public String getNodeName() {
-    return "localhost";
-  }
-
-  @Override
-  public String getTableUri(String table) {
-    return new File(_localDir, table).toURI().toString();
-  }
-
-  @Override
-  public int getShardCount(String table) {
-    return getShardList(table).size();
-  }
-
-  @Override
-  public long getTableSize(String table) throws IOException {
-    try {
-      File file = new File(new URI(getTableUri(table)));
-      return getFolderSize(file);
-    } catch (URISyntaxException e) {
-      throw new IOException("bad URI", e);
-    }
-  }
-
-  private long getFolderSize(File file) {
-    long size = 0;
-    if (file.isDirectory()) {
-      for (File sub : file.listFiles()) {
-        size += getFolderSize(sub);
-      }
-    } else {
-      size += file.length();
-    }
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
index a9fe542..41bb945 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
@@ -24,32 +24,30 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.thrift.ShardContext;
+import org.apache.blur.thrift.TableContext;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
 
 public abstract class AbstractBlurIndex extends BlurIndex {
 
-  private BlurAnalyzer _analyzer;
   private BlurIndexCloser _closer;
   private Directory _directory;
-  private IndexDeletionPolicy _indexDeletionPolicy = new KeepOnlyLastCommitDeletionPolicy();
   private AtomicReference<DirectoryReader> _indexReaderRef = new AtomicReference<DirectoryReader>();
   private AtomicBoolean _isClosed = new AtomicBoolean(false);
   private AtomicBoolean _open = new AtomicBoolean();
   private BlurIndexRefresher _refresher;
-  private String _shard;
-  private String _table;
+  private ShardContext context;
 
   protected IndexWriterConfig initIndexWriterConfig() {
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
+    TableContext tableContext = context.getTableContext();
+    conf.setIndexDeletionPolicy(tableContext.getIndexDeletionPolicy());
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
     _open.set(true);
@@ -105,10 +103,6 @@ public abstract class AbstractBlurIndex extends BlurIndex {
     return _isClosed;
   }
 
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    _analyzer = analyzer;
-  }
-
   public void setCloser(BlurIndexCloser closer) {
     _closer = closer;
   }
@@ -117,39 +111,23 @@ public abstract class AbstractBlurIndex extends BlurIndex {
     _directory = directory;
   }
 
-  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
-    _indexDeletionPolicy = indexDeletionPolicy;
-  }
-
   public void setRefresher(BlurIndexRefresher refresher) {
     _refresher = refresher;
   }
 
-  public void setShard(String shard) {
-    this._shard = shard;
-  }
-
-  public void setTable(String table) {
-    this._table = table;
-  }
-
-  protected BlurAnalyzer getAnalyzer() {
-    return _analyzer;
-  }
-
   protected Directory getDirectory() {
     return _directory;
   }
 
-  protected String getShard() {
-    return _shard;
+  protected boolean isOpen() {
+    return _open.get();
   }
-
-  protected String getTable() {
-    return _table;
+  
+  public ShardContext getContext() {
+    return context;
   }
 
-  protected boolean isOpen() {
-    return _open.get();
+  public void setContext(ShardContext context) {
+    this.context = context;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index ef27d73..2e5105a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -17,11 +17,11 @@ package org.apache.blur.manager.writer;
  * limitations under the License.
  */
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.lucene.index.IndexReader;
@@ -36,7 +36,7 @@ public abstract class BlurIndex {
 
   public abstract long addDocuments(boolean waitToBeVisible, boolean wal, List<Document> documents) throws IOException;
   public abstract long deleteDocuments(boolean waitToBeVisible, boolean wal, Term... deleteTerms) throws IOException;
-  public abstract long deleteDocuments(boolean waitToBeVisible, boolean wal, ByteBuffer... deleteQueries) throws IOException;
+  public abstract long deleteDocuments(boolean waitToBeVisible, boolean wal, Query... deleteQueries) throws IOException;
   public abstract long updateDocuments(boolean waitToBeVisible, boolean wal, List<UpdatePackage> updatePackages) throws IOException;
   public abstract void blockUntilGenerationIsVisible(long generation, boolean forceRefresh) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index 88df8ec..d437c48 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -19,13 +19,14 @@ package org.apache.blur.manager.writer;
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.TableContext;
 import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
@@ -56,7 +57,8 @@ public class BlurIndexReader extends AbstractBlurIndex {
   @Override
   public void close() throws IOException {
     super.close();
-    LOG.info("Reader for table [{0}] shard [{1}] closed.", getTable(), getShard());
+    TableContext tableContext = getContext().getTableContext();
+    LOG.info("Reader for table [{0}] shard [{1}] closed.", tableContext.getTable(), getContext().getShard());
   }
 
   @Override
@@ -75,8 +77,8 @@ public class BlurIndexReader extends AbstractBlurIndex {
   }
 
   @Override
-  public long deleteDocuments(boolean waitToBeVisible, boolean wal, ByteBuffer... deleteQueries) throws IOException {
-    throw new RuntimeException("Read-only shard"); 
+  public long deleteDocuments(boolean waitToBeVisible, boolean wal, Query... deleteQueries) throws IOException {
+    throw new RuntimeException("Read-only shard");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 397573c..6b49938 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -19,26 +19,24 @@ package org.apache.blur.manager.writer;
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.thrift.ShardContext;
+import org.apache.blur.thrift.TableContext;
 import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.UpdatePackage;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.lucene.codecs.appending.AppendingCodec;
 import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.TieredMergePolicy;
@@ -47,7 +45,6 @@ import org.apache.lucene.search.NRTManager;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 import org.apache.lucene.search.NRTManagerReopenThread;
 import org.apache.lucene.search.SearcherFactory;
-import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
 
 public class BlurNRTIndex extends BlurIndex {
@@ -59,62 +56,41 @@ public class BlurNRTIndex extends BlurIndex {
   private AtomicBoolean _isClosed = new AtomicBoolean();
   private IndexWriter _writer;
   private Thread _committer;
+  private SearcherFactory _searcherFactory = new SearcherFactory();
+  private AtomicReference<IndexReader> _indexRef = new AtomicReference<IndexReader>();
+  private long _lastRefresh;
+  private long _timeBetweenRefreshsNanos;
 
   // externally set
-  private BlurAnalyzer _analyzer;
   private Directory _directory;
-  private String _table;
-  private String _shard;
-  private Similarity _similarity;
   private NRTManagerReopenThread _refresher;
-  private TransactionRecorder _recorder;
-  private Configuration _configuration;
-  private Path _walPath;
-  private IndexDeletionPolicy _indexDeletionPolicy;
+
   private BlurIndexCloser _closer;
-  private AtomicReference<IndexReader> _indexRef = new AtomicReference<IndexReader>();
-  private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
-  private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(5000);
   private DirectoryReferenceFileGC _gc;
-  private TrackingIndexWriter _trackingWriter;
-  private SearcherFactory _searcherFactory = new SearcherFactory();
-  private long _lastRefresh;
-  private long _timeBetweenRefreshsNanos;
+  private TableContext tableContext;
+  private ShardContext shardContext;
 
-  // private SearcherWarmer _warmer = new SearcherWarmer() {
-  // @Override
-  // public void warm(IndexSearcher s) throws IOException {
-  // IndexReader indexReader = s.getIndexReader();
-  // IndexReader[] subReaders = indexReader.getSequentialSubReaders();
-  // if (subReaders == null) {
-  // PrimeDocCache.getPrimeDocBitSet(indexReader);
-  // } else {
-  // for (IndexReader reader : subReaders) {
-  // PrimeDocCache.getPrimeDocBitSet(reader);
-  // }
-  // }
-  // }
-  // };
+  // created
+  private TransactionRecorder _recorder;
+  private TrackingIndexWriter _trackingWriter;
 
   public void init() throws IOException {
-    Path walTablePath = new Path(_walPath, _table);
-    Path walShardPath = new Path(walTablePath, _shard);
+    tableContext = shardContext.getTableContext();
 
-    _timeBetweenRefreshsNanos = TimeUnit.MILLISECONDS.toNanos(_timeBetweenRefreshs);
+    _timeBetweenRefreshsNanos = TimeUnit.MILLISECONDS.toNanos(tableContext.getTimeBetweenRefreshs());
 
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, tableContext.getAnalyzer());
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-    conf.setSimilarity(_similarity);
-    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
+    conf.setSimilarity(tableContext.getSimilarity());
+    conf.setIndexDeletionPolicy(tableContext.getIndexDeletionPolicy());
     conf.setCodec(new AppendingCodec());
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
     DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(_directory, _gc);
     _writer = new IndexWriter(referenceCounter, conf);
     _recorder = new TransactionRecorder();
-    _recorder.setAnalyzer(_analyzer);
-    _recorder.setConfiguration(_configuration);
-    _recorder.setWalPath(walShardPath);
+    _recorder.setContext(shardContext);
+
     _recorder.init();
     _recorder.replay(_writer);
 
@@ -142,7 +118,7 @@ public class BlurNRTIndex extends BlurIndex {
   }
 
   @Override
-  public long deleteDocuments(boolean waitToBeVisible, boolean wal, ByteBuffer... deleteQueries) throws IOException {
+  public long deleteDocuments(boolean waitToBeVisible, boolean wal, Query... deleteQueries) throws IOException {
     long generation = _recorder.deleteDocuments(wal, deleteQueries, _trackingWriter);
     waitToBeVisible(waitToBeVisible, generation);
     return generation;
@@ -227,46 +203,10 @@ public class BlurNRTIndex extends BlurIndex {
     _closer.close(oldIndexReader);
   }
 
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    _analyzer = analyzer;
-  }
-
   public void setDirectory(Directory directory) {
     _directory = directory;
   }
 
-  public void setTable(String table) {
-    _table = table;
-  }
-
-  public void setShard(String shard) {
-    _shard = shard;
-  }
-
-  public void setSimilarity(Similarity similarity) {
-    _similarity = similarity;
-  }
-
-  public void setTimeBetweenCommits(long timeBetweenCommits) {
-    _timeBetweenCommits = timeBetweenCommits;
-  }
-
-  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
-    _timeBetweenRefreshs = timeBetweenRefreshs;
-  }
-
-  public void setWalPath(Path walPath) {
-    _walPath = walPath;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    _configuration = configuration;
-  }
-
-  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
-    _indexDeletionPolicy = indexDeletionPolicy;
-  }
-
   public void setCloser(BlurIndexCloser closer) {
     _closer = closer;
   }
@@ -280,9 +220,9 @@ public class BlurNRTIndex extends BlurIndex {
   }
 
   private void startRefresher() {
-    double targetMinStaleSec = _timeBetweenRefreshs / 1000.0;
+    double targetMinStaleSec = tableContext.getTimeBetweenRefreshs() / 1000.0;
     _refresher = new NRTManagerReopenThread(_nrtManager, targetMinStaleSec * 10, targetMinStaleSec);
-    _refresher.setName("Refresh Thread [" + _table + "/" + _shard + "]");
+    _refresher.setName("Refresh Thread [" + tableContext.getTable() + "/" + shardContext.getShard() + "]");
     _refresher.setDaemon(true);
     _refresher.start();
   }
@@ -293,27 +233,35 @@ public class BlurNRTIndex extends BlurIndex {
       public void run() {
         while (!_isClosed.get()) {
           try {
-            LOG.info("Committing of [{0}/{1}].", _table, _shard);
+            LOG.info("Committing of [{0}/{1}].", tableContext.getTable(), shardContext.getShard());
             _recorder.commit(_writer);
           } catch (CorruptIndexException e) {
-            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _table, _shard);
+            LOG.error("Curruption Error during commit of [{0}/{1}].", e, tableContext.getTable(), shardContext.getShard());
           } catch (IOException e) {
-            LOG.error("IO Error during commit of [{0}/{1}].", e, _table, _shard);
+            LOG.error("IO Error during commit of [{0}/{1}].", e, tableContext.getTable(), shardContext.getShard());
           }
           try {
-            Thread.sleep(_timeBetweenCommits);
+            Thread.sleep(tableContext.getTimeBetweenCommits());
           } catch (InterruptedException e) {
             if (_isClosed.get()) {
               return;
             }
-            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _table, _shard);
+            LOG.error("Unknown error with committer thread [{0}/{1}].", e, tableContext.getTable(), shardContext.getShard());
           }
         }
       }
     });
     _committer.setDaemon(true);
-    _committer.setName("Commit Thread [" + _table + "/" + _shard + "]");
+    _committer.setName("Commit Thread [" + tableContext.getTable() + "/" + shardContext.getShard() + "]");
     _committer.start();
   }
 
+  public ShardContext getContext() {
+    return shardContext;
+  }
+
+  public void setContext(ShardContext context) {
+    this.shardContext = context;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index 4901be3..978add3 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -22,7 +22,6 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -30,26 +29,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.ShardContext;
+import org.apache.blur.thrift.TableContext;
 import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.UpdatePackage;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TTransport;
 
 public class TransactionRecorder {
 
@@ -94,23 +91,27 @@ public class TransactionRecorder {
   }
 
   private AtomicBoolean running = new AtomicBoolean(true);
-  private Path walPath;
-  private Configuration configuration;
   private FileSystem fileSystem;
   private AtomicReference<FSDataOutputStream> outputStream = new AtomicReference<FSDataOutputStream>();
   private AtomicLong lastSync = new AtomicLong();
   private long timeBetweenSyncs = TimeUnit.MILLISECONDS.toNanos(10);
-  private BlurAnalyzer analyzer;
+  private ShardContext shardContext;
+  private TableContext tableContext;
+
+  public void setContext(ShardContext context) {
+    this.shardContext = context;
+  }
 
   public void init() throws IOException {
-    fileSystem = walPath.getFileSystem(configuration);
+    tableContext = shardContext.getTableContext();
+    fileSystem = shardContext.getWalShardPath().getFileSystem(tableContext.getConfiguration());
   }
 
   public void open() throws IOException {
-    if (fileSystem.exists(walPath)) {
-      throw new IOException("WAL path [" + walPath + "] still exists, replay must have not worked.");
+    if (fileSystem.exists(shardContext.getWalShardPath())) {
+      throw new IOException("WAL path [" + shardContext.getWalShardPath() + "] still exists, replay must have not worked.");
     } else {
-      outputStream.set(fileSystem.create(walPath));
+      outputStream.set(fileSystem.create(shardContext.getWalShardPath()));
     }
     if (outputStream == null) {
       throw new RuntimeException();
@@ -119,8 +120,8 @@ public class TransactionRecorder {
   }
 
   public void replay(IndexWriter writer) throws IOException {
-    if (fileSystem.exists(walPath)) {
-      FSDataInputStream inputStream = fileSystem.open(walPath);
+    if (fileSystem.exists(shardContext.getWalShardPath())) {
+      FSDataInputStream inputStream = fileSystem.open(shardContext.getWalShardPath());
       replay(writer, inputStream);
       inputStream.close();
       commit(writer);
@@ -153,10 +154,12 @@ public class TransactionRecorder {
         }
         break;
       case DELETE_QUERY:
-        ByteBuffer[] deleteQueries = readDeleteQueriesFromWal(dataInputStream);
-        writer.deleteDocuments(toLucene(deleteQueries));
-        deleteQueriesCount += deleteQueries.length;
-        break;
+        throw new RuntimeException("not supported");
+        // ByteBuffer[] deleteQueries =
+        // readDeleteQueriesFromWal(dataInputStream);
+        // writer.deleteDocuments(toLucene(deleteQueries));
+        // deleteQueriesCount += deleteQueries.length;
+        // break;
       case DELETE_TERM:
         Term[] deleteTerms = readDeleteTermsFromWal(dataInputStream);
         writer.deleteDocuments(toLucene(deleteTerms));
@@ -194,11 +197,11 @@ public class TransactionRecorder {
 
   }
 
-  private ByteBuffer[] readDeleteQueriesFromWal(DataInputStream in) {
+  private Query[] readDeleteQueriesFromWal(DataInputStream in) {
     return null;
   }
 
-  private void writeDeleteQueriesToWal(ByteBuffer[] deleteQueries) {
+  private void writeDeleteQueriesToWal(Query[] deleteQueries) {
 
   }
 
@@ -260,12 +263,12 @@ public class TransactionRecorder {
   }
 
   private void rollLog() throws IOException {
-    LOG.info("Rolling WAL path [" + walPath + "]");
+    LOG.info("Rolling WAL path [" + shardContext.getWalShardPath() + "]");
     FSDataOutputStream os = outputStream.get();
     if (os != null) {
       os.close();
     }
-    fileSystem.delete(walPath, false);
+    fileSystem.delete(shardContext.getWalShardPath(), false);
     open();
   }
 
@@ -276,14 +279,6 @@ public class TransactionRecorder {
     outputStream.get().close();
   }
 
-  public void setWalPath(Path walPath) {
-    this.walPath = walPath;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
   public void commit(IndexWriter writer) throws CorruptIndexException, IOException {
     synchronized (running) {
       long s = System.nanoTime();
@@ -296,10 +291,6 @@ public class TransactionRecorder {
     }
   }
 
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    this.analyzer = analyzer;
-  }
-
   public long addDocuments(boolean wal, List<org.apache.blur.thrift.generated.Document> documents, TrackingIndexWriter writer) throws IOException {
     if (wal) {
       writeAddDocumentsToWal(documents);
@@ -314,11 +305,15 @@ public class TransactionRecorder {
     return writer.deleteDocuments(toLucene(deleteTerms));
   }
 
-  public long deleteDocuments(boolean wal, ByteBuffer[] deleteQueries, TrackingIndexWriter writer) throws IOException {
+  public long deleteDocuments(boolean wal, Query[] deleteQueries, TrackingIndexWriter writer) throws IOException {
+    org.apache.lucene.search.Query[] queryArray = new org.apache.lucene.search.Query[deleteQueries.length];
+    for (int i = 0; i < deleteQueries.length; i++) {
+      queryArray[i] = tableContext.getQueryConverter().convert(deleteQueries[i]);
+    }
     if (wal) {
       writeDeleteQueriesToWal(deleteQueries);
     }
-    return writer.deleteDocuments(toLucene(deleteQueries));
+    return writer.deleteDocuments(queryArray);
   }
 
   public long updateDocuments(boolean wal, List<UpdatePackage> updatePackages, TrackingIndexWriter writer) throws IOException {
@@ -331,5 +326,4 @@ public class TransactionRecorder {
     }
     return generation;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
index ea87ba8..3c5ae36 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
@@ -19,12 +19,9 @@ package org.apache.blur.thrift;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
-import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopDocs;
-import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopFieldDocs;
 import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,6 +42,7 @@ import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.QueryConverter;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.thrift.BlurServer.SearchAction.ACTION;
@@ -57,6 +55,7 @@ import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Generation;
 import org.apache.blur.thrift.generated.LiveSchema;
 import org.apache.blur.thrift.generated.MutateOptions;
+import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.QueryStatus;
 import org.apache.blur.thrift.generated.Session;
@@ -70,11 +69,7 @@ import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.ThriftLuceneConversion;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.Sort;
 import org.apache.thrift.TException;
 
 public class BlurServer extends TableAdmin implements Iface {
@@ -215,10 +210,14 @@ public class BlurServer extends TableAdmin implements Iface {
       Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor, shardIndexes, searchers);
 
       List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
-      Query query = ThriftLuceneConversion.toLuceneQuery(queryArgs.query);
-      Filter filter = ThriftLuceneConversion.toLuceneFilter(queryArgs);
-      Sort sort = ThriftLuceneConversion.toLuceneSort(queryArgs);
-      ScoreDoc after = ThriftLuceneConversion.toLucene(queryArgs.getAfter());
+      
+      TableContext context = _indexServer.getTableContext(session.getTableName());
+      QueryConverter queryConverter = context.getQueryConverter();
+
+      org.apache.lucene.search.Query query = queryConverter.convert(queryArgs.getQuery());
+      org.apache.lucene.search.Filter filter = ThriftLuceneConversion.toLuceneFilter(queryArgs);
+      org.apache.lucene.search.Sort sort = ThriftLuceneConversion.toLuceneSort(queryArgs);
+      org.apache.lucene.search.ScoreDoc after = ThriftLuceneConversion.toLucene(queryArgs.getAfter());
       boolean doDocScores = queryArgs.isDoDocScores();
       boolean doMaxScore = queryArgs.isDoMaxScore();
       int numberToFetch = queryArgs.getNumberToFetch();
@@ -261,57 +260,6 @@ public class BlurServer extends TableAdmin implements Iface {
     }
   }
 
-  static class SearchCallable implements Callable<TopFieldDocs> {
-    private final ScoreDoc after;
-    private final Sort sort;
-    private final Filter filter;
-    private final Query query;
-    private final IndexSearcher searcher;
-    private final int count;
-    private final boolean doDocScores;
-    private final boolean doMaxScore;
-    private final int shardIndex;
-
-    SearchCallable(int shardIndex, IndexSearcher searcher, ScoreDoc after, Query query, Filter filter, Sort sort, int count, boolean doDocScores, boolean doMaxScore) {
-      this.after = after;
-      this.searcher = searcher;
-      this.query = query;
-      this.filter = filter;
-      this.sort = sort;
-      this.count = count;
-      this.doDocScores = doDocScores;
-      this.doMaxScore = doMaxScore;
-      this.shardIndex = shardIndex;
-    }
-
-    @Override
-    public TopFieldDocs call() throws Exception {
-      return addShardIndex(doSearch());
-    }
-
-    private TopFieldDocs addShardIndex(TopFieldDocs topFieldDocs) {
-      topFieldDocs.setShardIndex(shardIndex);
-      return topFieldDocs;
-    }
-
-    private TopFieldDocs doSearch() throws IOException {
-      if (after == null) {
-        if (sort == null) {
-          return toThrift(setShardIndexTopDocs(shardIndex, searcher.search(query, filter, count)));
-        } else {
-          return toThrift(setShardIndexTopFieldDocs(shardIndex, searcher.search(query, filter, count, sort, doDocScores, doMaxScore)));
-        }
-      } else {
-        if (sort == null) {
-          return toThrift(setShardIndexTopDocs(shardIndex, searcher.searchAfter(after, query, filter, count)));
-        } else {
-          return toThrift(setShardIndexTopFieldDocs(shardIndex,
-              (org.apache.lucene.search.TopFieldDocs) searcher.searchAfter(after, query, filter, count, sort, doDocScores, doMaxScore)));
-        }
-      }
-    }
-  }
-
   static class SearchAction {
     enum ACTION {
       LOCAL, REMOTE
@@ -475,7 +423,7 @@ public class BlurServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<ByteBuffer> queries) throws BlurException, TException {
+  public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<org.apache.blur.thrift.generated.Query> queries) throws BlurException, TException {
     String table = options.getTable();
     int shardIndex = getShardIndex(options);
     boolean waitToBeVisible = options.isWaitToBeVisible();
@@ -486,7 +434,7 @@ public class BlurServer extends TableAdmin implements Iface {
       if (index == null) {
         generations.addAll(forwardDeleteDocumentsByQueries(options, queries));
       } else {
-        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, queries.toArray(new ByteBuffer[queries.size()]));
+        long generation = index.deleteDocuments(waitToBeVisible, writeAheadLog, queries.toArray(new Query[queries.size()]));
         generations.add(new Generation(table, shardIndex, generation));
       }
       return generations;
@@ -609,7 +557,8 @@ public class BlurServer extends TableAdmin implements Iface {
     });
   }
 
-  private List<Generation> forwardDeleteDocumentsByQueries(final MutateOptions options, final List<ByteBuffer> queries) throws BlurException, TException, IOException {
+  private List<Generation> forwardDeleteDocumentsByQueries(final MutateOptions options, final List<org.apache.blur.thrift.generated.Query> queries) throws BlurException,
+      TException, IOException {
     String table = options.getTable();
     int shardIndex = options.getShardIndex();
     Connection connection = getConnection(table, shardIndex);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java b/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java
new file mode 100644
index 0000000..77fec2f
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/Configurable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.thrift;
+
+public interface Configurable {
+
+  public void setTableContext(TableContext context);
+
+  public TableContext getTableContext();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java b/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java
new file mode 100644
index 0000000..9d4b70a
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/Configured.java
@@ -0,0 +1,26 @@
+package org.apache.blur.thrift;
+
+
+public abstract class Configured implements Configurable {
+
+  private TableContext context;
+
+  public Configured() {
+    this(null);
+  }
+
+  public Configured(TableContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public TableContext getTableContext() {
+    return context;
+  }
+
+  @Override
+  public void setTableContext(TableContext context) {
+    this.context = context;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java b/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java
new file mode 100644
index 0000000..6c58883
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/SearchCallable.java
@@ -0,0 +1,66 @@
+package org.apache.blur.thrift;
+
+import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopDocs;
+import static org.apache.blur.utils.ThriftLuceneConversion.setShardIndexTopFieldDocs;
+import static org.apache.blur.utils.ThriftLuceneConversion.toThrift;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+
+public class SearchCallable implements Callable<TopFieldDocs> {
+  private final ScoreDoc after;
+  private final Sort sort;
+  private final Filter filter;
+  private final Query query;
+  private final IndexSearcher searcher;
+  private final int count;
+  private final boolean doDocScores;
+  private final boolean doMaxScore;
+  private final int shardIndex;
+
+  SearchCallable(int shardIndex, IndexSearcher searcher, ScoreDoc after, Query query, Filter filter, Sort sort, int count, boolean doDocScores, boolean doMaxScore) {
+    this.after = after;
+    this.searcher = searcher;
+    this.query = query;
+    this.filter = filter;
+    this.sort = sort;
+    this.count = count;
+    this.doDocScores = doDocScores;
+    this.doMaxScore = doMaxScore;
+    this.shardIndex = shardIndex;
+  }
+
+  @Override
+  public TopFieldDocs call() throws Exception {
+    return addShardIndex(doSearch());
+  }
+
+  private TopFieldDocs addShardIndex(TopFieldDocs topFieldDocs) {
+    topFieldDocs.setShardIndex(shardIndex);
+    return topFieldDocs;
+  }
+
+  private TopFieldDocs doSearch() throws IOException {
+    if (after == null) {
+      if (sort == null) {
+        return toThrift(setShardIndexTopDocs(shardIndex, searcher.search(query, filter, count)));
+      } else {
+        return toThrift(setShardIndexTopFieldDocs(shardIndex, searcher.search(query, filter, count, sort, doDocScores, doMaxScore)));
+      }
+    } else {
+      if (sort == null) {
+        return toThrift(setShardIndexTopDocs(shardIndex, searcher.searchAfter(after, query, filter, count)));
+      } else {
+        return toThrift(setShardIndexTopFieldDocs(shardIndex,
+            (org.apache.lucene.search.TopFieldDocs) searcher.searchAfter(after, query, filter, count, sort, doDocScores, doMaxScore)));
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java
new file mode 100644
index 0000000..48709b9
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ShardContext.java
@@ -0,0 +1,71 @@
+package org.apache.blur.thrift;
+
+import java.io.IOException;
+
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+
+public class ShardContext {
+
+  private String shard;
+  private Path walShardPath;
+  private Path hdfsDirPath;
+  private Directory directory;
+  private TableContext tableContext;
+
+  public TableContext getTableContext() {
+    return tableContext;
+  }
+
+  public void setTableContext(TableContext tableContext) {
+    this.tableContext = tableContext;
+  }
+
+  protected ShardContext() {
+
+  }
+
+  public Directory getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(Directory directory) {
+    this.directory = directory;
+  }
+
+  public Path getHdfsDirPath() {
+    return hdfsDirPath;
+  }
+
+  public void setHdfsDirPath(Path hdfsDirPath) {
+    this.hdfsDirPath = hdfsDirPath;
+  }
+
+  public String getShard() {
+    return shard;
+  }
+
+  public void setShard(String shard) {
+    this.shard = shard;
+  }
+
+  public Path getWalShardPath() {
+    return walShardPath;
+  }
+
+  public void setWalShardPath(Path walShardPath) {
+    this.walShardPath = walShardPath;
+  }
+
+  public static ShardContext create(TableContext tableContext, String shard) throws IOException {
+    ShardContext shardContext = new ShardContext();
+    shardContext.tableContext = tableContext;
+    shardContext.walShardPath = new Path(tableContext.getWalTablePath(), shard);
+    shardContext.hdfsDirPath = new Path(tableContext.getTablePath(), shard);
+    shardContext.shard = shard;
+    shardContext.directory = new HdfsDirectory(tableContext.getConfiguration(), shardContext.hdfsDirPath);
+    return shardContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java
new file mode 100644
index 0000000..93db181
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/TableContext.java
@@ -0,0 +1,176 @@
+package org.apache.blur.thrift;
+
+import static org.apache.blur.utils.BlurConstants.BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_LUCENE_INDEX_SIMILARITY_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_QUERY_CONVERTER_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.lucene.search.QueryConverter;
+import org.apache.blur.lucene.search.QueryConverterImpl;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.search.similarities.DefaultSimilarity;
+import org.apache.lucene.search.similarities.Similarity;
+
+public class TableContext {
+
+  private static final String LOGS = "logs";
+
+  private Path tablePath;
+  private Path walTablePath;
+  private BlurAnalyzer analyzer;
+  private String defaultFieldName;
+  private String table;
+  private IndexDeletionPolicy indexDeletionPolicy;
+  private Similarity similarity;
+  private QueryConverter queryConverter;
+  private Configuration configuration;
+  private TableDescriptor descriptor;
+  private long timeBetweenCommits;
+  private long timeBetweenRefreshs;
+
+  protected TableContext() {
+
+  }
+
+  public static TableContext create(TableDescriptor tableDescriptor) {
+    Configuration configuration = new Configuration();
+    Map<String, String> properties = tableDescriptor.getProperties();
+    if (properties != null) {
+      for (Entry<String, String> prop : properties.entrySet()) {
+        configuration.set(prop.getKey(), prop.getValue());
+      }
+    }
+
+    TableContext tableContext = new TableContext();
+    tableContext.configuration = configuration;
+    tableContext.tablePath = new Path(tableDescriptor.getStoragePath());
+    tableContext.walTablePath = new Path(tableContext.tablePath, LOGS);
+    tableContext.analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzer());
+    tableContext.defaultFieldName = tableDescriptor.getDefaultFieldName();
+    tableContext.table = tableDescriptor.getName();
+    tableContext.descriptor = tableDescriptor;
+    tableContext.timeBetweenCommits = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000);
+    tableContext.timeBetweenRefreshs = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 5000);
+
+    Class<?> c1 = configuration.getClass(BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS, KeepOnlyLastCommitDeletionPolicy.class);
+    tableContext.indexDeletionPolicy = (IndexDeletionPolicy) configure(ReflectionUtils.newInstance(c1, configuration), tableContext);
+    Class<?> c2 = configuration.getClass(BLUR_LUCENE_INDEX_SIMILARITY_CLASS, DefaultSimilarity.class);
+    tableContext.similarity = (Similarity) configure(ReflectionUtils.newInstance(c2, configuration), tableContext);
+    Class<?> c3 = configuration.getClass(BLUR_QUERY_CONVERTER_CLASS, QueryConverterImpl.class);
+    tableContext.queryConverter = (QueryConverter) configure(ReflectionUtils.newInstance(c3, configuration), tableContext);
+    return tableContext;
+  }
+
+  private static Object configure(Object o, TableContext tableContext) {
+    if (o instanceof Configurable) {
+      ((Configurable) o).setTableContext(tableContext);
+    }
+    return o;
+  }
+
+  public IndexDeletionPolicy getIndexDeletionPolicy() {
+    return indexDeletionPolicy;
+  }
+
+  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+    this.indexDeletionPolicy = indexDeletionPolicy;
+  }
+
+  public Similarity getSimilarity() {
+    return similarity;
+  }
+
+  public void setSimilarity(Similarity similarity) {
+    this.similarity = similarity;
+  }
+
+  public long getTimeBetweenCommits() {
+    return timeBetweenCommits;
+  }
+
+  public void setTimeBetweenCommits(long timeBetweenCommits) {
+    this.timeBetweenCommits = timeBetweenCommits;
+  }
+
+  public long getTimeBetweenRefreshs() {
+    return timeBetweenRefreshs;
+  }
+
+  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
+    this.timeBetweenRefreshs = timeBetweenRefreshs;
+  }
+
+  public BlurAnalyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public QueryConverter getQueryConverter() {
+    return queryConverter;
+  }
+
+  public void setQueryConverter(QueryConverter queryConverter) {
+    this.queryConverter = queryConverter;
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  public TableDescriptor getDescriptor() {
+    return descriptor;
+  }
+
+  public void setDescriptor(TableDescriptor descriptor) {
+    this.descriptor = descriptor;
+  }
+
+  public Path getTablePath() {
+    return tablePath;
+  }
+
+  public void setTablePath(Path tablePath) {
+    this.tablePath = tablePath;
+  }
+
+  public Path getWalTablePath() {
+    return walTablePath;
+  }
+
+  public void setWalTablePath(Path walTablePath) {
+    this.walTablePath = walTablePath;
+  }
+
+  public String getDefaultFieldName() {
+    return defaultFieldName;
+  }
+
+  public void setDefaultFieldName(String defaultFieldName) {
+    this.defaultFieldName = defaultFieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
index 064e99f..876810c 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
@@ -31,8 +31,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
@@ -70,8 +68,6 @@ import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooKeeper;
@@ -172,7 +168,6 @@ public class ThriftBlurServer extends ThriftServer {
 
     BlurFilterCache filterCache = getFilterCache(configuration);
     BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
-    IndexDeletionPolicy indexDeletionPolicy = new KeepOnlyLastCommitDeletionPolicy();
 
     final DistributedIndexServer indexServer = new DistributedIndexServer();
     indexServer.setBlurMetrics(blurMetrics);
@@ -187,9 +182,6 @@ public class ThriftBlurServer extends ThriftServer {
     indexServer.setFilterCache(filterCache);
     indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000));
     indexServer.setWarmup(indexWarmup);
-    indexServer.setIndexDeletionPolicy(indexDeletionPolicy);
-    indexServer.setTimeBetweenCommits(configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000));
-    indexServer.setTimeBetweenRefreshs(configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 500));
     indexServer.init();
 
     TableLayout layout = new TableLayout() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f92d94a9/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
index 650f3c3..4b33aee 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -65,6 +65,10 @@ public class BlurConstants {
 
   public static final String BLUR_SHARD_TIME_BETWEEN_COMMITS = "blur.shard.time.between.commits";
   public static final String BLUR_SHARD_TIME_BETWEEN_REFRESHS = "blur.shard.time.between.refreshs";
+  
+  public static final String BLUR_QUERY_CONVERTER_CLASS = "blur.query.converter.class";
+  public static final String BLUR_LUCENE_INDEX_SIMILARITY_CLASS = "blur.lucene.index.similarity.class";
+  public static final String BLUR_LUCENE_INDEX_DELETION_POLICY_CLASS = "blur.lucene.index.deletion.policy.class";
 
   public static final String BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT = "blur.controller.server.thrift.thread.count";
   public static final String BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT = "blur.controller.server.remote.thread.count";


Mime
View raw message