incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Distributed search and addDocuments now work with routing.
Date Sun, 25 Nov 2012 01:34:47 GMT
Updated Branches:
  refs/heads/0.2-dev-removing-old-thrift aee531c28 -> 3d131d29e


Distributed search and addDocuments now work with routing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/3d131d29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/3d131d29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/3d131d29

Branch: refs/heads/0.2-dev-removing-old-thrift
Commit: 3d131d29e69b621916bffc6240b7e7c6d4271be6
Parents: cb538ba
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Nov 24 20:34:15 2012 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Nov 24 20:34:15 2012 -0500

----------------------------------------------------------------------
 .../org/apache/blur/thrift/BlurShardServer.java    |  168 ++++++++++-----
 .../java/org/apache/blur/thrift/SessionInfo.java   |   10 +
 .../java/org/apache/blur/thrift/TableLayout.java   |   11 +
 .../apache/blur/thrift/ThriftBlurShardServer.java  |   36 +++-
 .../src/test/java/org/apache/blur/MiniCluster.java |    7 +-
 .../org/apache/blur/thrift/BlurClusterTest.java    |   39 ++--
 .../java/org/apache/blur/thrift/BlurClient.java    |   30 +++-
 .../org/apache/blur/thrift/BlurClientManager.java  |    6 +-
 8 files changed, 234 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 026530a..83d2e00 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -44,6 +44,10 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.thrift.BlurShardServer.SearchAction.ACTION;
+import org.apache.blur.thrift.TableLayout.TYPE;
+import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Document;
@@ -52,6 +56,7 @@ import org.apache.blur.thrift.generated.MutateOptions;
 import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.QueryStatus;
 import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.generated.TableStats;
 import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.TopFieldDocs;
@@ -79,6 +84,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
   private ExecutorService _indexSearcherExecutor;
   private ExecutorService _searchExecutor;
   private int _dataFetchThreadCount = 32;
+  private TableLayout _layout;
 
   public void init() throws BlurException {
     _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
@@ -93,15 +99,14 @@ public class BlurShardServer extends TableAdmin implements Iface {
     _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
   }
 
-
   @Override
   public TableStats tableStats(String table) throws BlurException, TException {
     checkTable(table);
     try {
       TableStats tableStats = new TableStats();
-//      tableStats.tableName = table;
-//      tableStats.recordCount = _indexServer.getRecordCount(table);
-//      tableStats.rowCount = _indexServer.getRowCount(table);
+      // tableStats.tableName = table;
+      // tableStats.recordCount = _indexServer.getRecordCount(table);
+      // tableStats.rowCount = _indexServer.getRowCount(table);
       tableStats.bytes = _indexServer.getTableSize(table);
       tableStats.queries = 0;
       return tableStats;
@@ -119,7 +124,6 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
-
   public long getMaxTimeToLive() {
     return _maxTimeToLive;
   }
@@ -144,7 +148,6 @@ public class BlurShardServer extends TableAdmin implements Iface {
     _indexServer = indexServer;
   }
 
-
   public int getDataFetchThreadCount() {
     return _dataFetchThreadCount;
   }
@@ -163,6 +166,11 @@ public class BlurShardServer extends TableAdmin implements Iface {
 
   @Override
   public Session openReadSession(String table) throws BlurException, TException {
+    String uuid = UUID.randomUUID().toString();
+    return newSession(table, uuid);
+  }
+
+  private Session newSession(String table, String uuid) throws BlurException {
     checkTable(table);
     BlurAnalyzer analyzer = _indexServer.getAnalyzer(table);
     Map<String, BlurIndex> blurIndexes;
@@ -172,11 +180,11 @@ public class BlurShardServer extends TableAdmin implements Iface {
       LOG.error("Unknown error while trying to fetch index readers.", e);
       throw new BException(e.getMessage(), e);
     }
-
-    String uuid = UUID.randomUUID().toString();
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, table);
     SessionInfo sessionInfo = new SessionInfo();
     sessionInfo.setUuid(uuid);
     sessionInfo.setAnalyzer(analyzer);
+    sessionInfo.setTableDescriptor(tableDescriptor);
 
     for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
       int index = BlurUtil.getShardIndex(entry.getKey());
@@ -195,12 +203,17 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public List<TopFieldDocs> search(Session session, QueryArgs queryArgs) throws BlurException,
TException {
+  public List<TopFieldDocs> search(final Session session, final QueryArgs queryArgs)
throws BlurException, TException {
     SessionInfo info = getSessionInfo(session);
+    if (info == null) {
+      newSession("test", session.getSessionId());
+      info = getSessionInfo(session); 
+    }
     try {
       Map<Integer, IndexSearcher> searchers = info.getSearchers();
       List<Integer> shardIndexes = queryArgs.getShardIndexes();
-      Collection<Entry<Integer, IndexSearcher>> searchersToSearch = getSearchers(shardIndexes,
searchers);
+      TableDescriptor tableDescriptor = info.getTableDescriptor();
+      Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor,
shardIndexes, searchers);
 
       List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
       Query query = Convert.toLuceneQuery(queryArgs.query);
@@ -210,10 +223,32 @@ public class BlurShardServer extends TableAdmin implements Iface {
       boolean doDocScores = queryArgs.isDoDocScores();
       boolean doMaxScore = queryArgs.isDoMaxScore();
       int numberToFetch = queryArgs.getNumberToFetch();
-      for (Entry<Integer, IndexSearcher> entry : searchersToSearch) {
-        Future<TopFieldDocs> future = _indexSearcherExecutor.submit(new SearchCallable(entry.getKey(),
entry.getValue(), after, query, filter, sort, numberToFetch, doDocScores,
-            doMaxScore));
-        futures.add(future);
+      for (SearchAction action : searchersToSearch) {
+        final int shardIndex = action.shardIndex;
+        if (action.type == ACTION.LOCAL) {
+          SearchCallable searchCallable = new SearchCallable(shardIndex, action.indexSearcher,
after, query, filter, sort, numberToFetch, doDocScores, doMaxScore);
+          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(searchCallable);
+          futures.add(future);
+        } else if (action.type == ACTION.REMOTE) {
+          // @TODO need to send only one call per server, instead of one for
+          // each shard server
+          final Connection connection = action.remoteServer;
+          Future<TopFieldDocs> future = _indexSearcherExecutor.submit(new Callable<TopFieldDocs>()
{
+            @Override
+            public TopFieldDocs call() throws Exception {
+              List<TopFieldDocs> list = BlurClientManager.execute(connection, new BlurCommand<List<TopFieldDocs>>()
{
+                @Override
+                public List<TopFieldDocs> call(Client client) throws BlurException,
TException {
+                  QueryArgs remoteQueryArgs = new QueryArgs(queryArgs);
+                  remoteQueryArgs.addToShardIndexes(shardIndex);
+                  return client.search(session, remoteQueryArgs);
+                }
+              });
+              return list.iterator().next();
+            }
+          });
+          futures.add(future);
+        }
       }
 
       List<TopFieldDocs> result = new ArrayList<TopFieldDocs>(futures.size());
@@ -260,7 +295,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
       List<org.apache.blur.thrift.generated.ScoreDoc> scoreDocs = topFieldDocs.getScoreDocs();
       for (org.apache.blur.thrift.generated.ScoreDoc sd : scoreDocs) {
         long docLocation = sd.getDocLocation();
-//        docLocation.setShardIndex(shardIndex);
+        // docLocation.setShardIndex(shardIndex);
       }
       return topFieldDocs;
     }
@@ -282,34 +317,47 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
-  private Collection<Entry<Integer, IndexSearcher>> getSearchers(List<Integer>
shardIndexes, Map<Integer, IndexSearcher> searchers) {
-    Collection<Entry<Integer, IndexSearcher>> searchersToSearch = new ArrayList<Map.Entry<Integer,
IndexSearcher>>();
-    if (shardIndexes == null) {
-      // all indexes
-      searchersToSearch.addAll(searchers.entrySet());
-    } else {
-      for (Integer i : shardIndexes) {
-        final Integer index = i;
-        final IndexSearcher searcher = searchers.get(index);
-        if (searcher != null) {
-          searchersToSearch.add(new Entry<Integer, IndexSearcher>() {
+  static class SearchAction {
+    enum ACTION {
+      LOCAL, REMOTE
+    }
 
-            @Override
-            public IndexSearcher setValue(IndexSearcher value) {
-              throw new RuntimeException("Not Supported");
-            }
+    ACTION type;
 
-            @Override
-            public IndexSearcher getValue() {
-              return searcher;
-            }
+    SearchAction(int shardIndex, IndexSearcher indexSearcher) {
+      this.type = ACTION.LOCAL;
+      this.shardIndex = shardIndex;
+      this.indexSearcher = indexSearcher;
+    }
 
-            @Override
-            public Integer getKey() {
-              return index;
-            }
-          });
-        }
+    SearchAction(int shardIndex, Connection remoteServer) {
+      this.type = ACTION.REMOTE;
+      this.shardIndex = shardIndex;
+      this.remoteServer = remoteServer;
+    }
+
+    int shardIndex;
+    IndexSearcher indexSearcher;
+    Connection remoteServer;
+  }
+
+  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor,
List<Integer> shardIndexes, Map<Integer, IndexSearcher> searchers) throws BlurException
{
+    String name = tableDescriptor.getName();
+    int shardCount = tableDescriptor.getShardCount();
+    Collection<SearchAction> searchersToSearch = new ArrayList<SearchAction>();
+    if (shardIndexes == null) {
+      shardIndexes = new ArrayList<Integer>(shardCount);
+      // all indexes
+      for (int i = 0; i < shardCount; i++) {
+        shardIndexes.add(i);
+      }
+    }
+    for (Integer index : shardIndexes) {
+      IndexSearcher searcher = searchers.get(index);
+      if (searcher != null) {
+        searchersToSearch.add(new SearchAction(index, searcher));
+      } else {
+        searchersToSearch.add(new SearchAction(index, getConnection(name, index)));
       }
     }
     return searchersToSearch;
@@ -356,16 +404,34 @@ public class BlurShardServer extends TableAdmin implements Iface {
     try {
       BlurIndex index = getIndex(table, shardIndex);
       if (index == null) {
-        System.out.println("Needs to be routed to the correct server [" + shardIndex + "].");
-        return;
+        forwardAddDocuments(options, documents);
+      } else {
+        index.addDocuments(waitToBeVisible, writeAheadLog, documents);
       }
-      index.addDocuments(waitToBeVisible, writeAheadLog, documents);
     } catch (Throwable t) {
       LOG.error("Unknown error", t);
       throw new BException(t.getMessage(), t);
     }
   }
 
+  private void forwardAddDocuments(final MutateOptions options, final List<Document>
documents) throws BlurException, TException, IOException {
+    String table = options.getTable();
+    int shardIndex = options.getShardIndex();
+    Connection connection = getConnection(table, shardIndex);
+    BlurClientManager.execute(connection, new BlurCommand<Void>() {
+      @Override
+      public Void call(Client client) throws BlurException, TException {
+        client.addDocuments(options, documents);
+        return null;
+      }
+    });
+  }
+
+  private Connection getConnection(String table, int shardIndex) {
+    String server = _layout.findServer(table, shardIndex, TYPE.WRITABLE);
+    return new Connection(server);
+  }
+
   @Override
   public void deleteDocumentsByQueries(MutateOptions options, List<ByteBuffer> queries)
throws BlurException, TException {
     String table = options.getTable();
@@ -423,33 +489,37 @@ public class BlurShardServer extends TableAdmin implements Iface {
     return index;
   }
 
-
   @Override
   public List<Integer> serverLayout(String table, String server) throws BlurException,
TException {
     throw new BlurException("Not implemented", null);
   }
 
-
   @Override
   public void cancelQuery(Session session, long id) throws BlurException, TException {
-    throw new BlurException("Not implemented", null);    
+    throw new BlurException("Not implemented", null);
   }
 
-
   @Override
   public List<Long> queryStatusIdList(Session session) throws BlurException, TException
{
     throw new BlurException("Not implemented", null);
   }
 
-
   @Override
   public QueryStatus queryStatus(Session session, long id) throws BlurException, TException
{
     throw new BlurException("Not implemented", null);
   }
 
-
   @Override
   public LiveSchema schema(String table) throws BlurException, TException {
     throw new BlurException("Not implemented", null);
   }
+
+  public TableLayout getLayout() {
+    return _layout;
+  }
+
+  public void setLayout(TableLayout layout) {
+    this._layout = layout;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java b/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
index f0bb718..9d093af 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
 
@@ -35,6 +36,7 @@ public class SessionInfo {
   private BlurAnalyzer analyzer;
   private Map<Integer, IndexReader> readers = new HashMap<Integer, IndexReader>();
   private Map<Integer, IndexSearcher> searchers = new HashMap<Integer, IndexSearcher>();
+  private TableDescriptor tableDescriptor;
 
   public BlurAnalyzer getAnalyzer() {
     return analyzer;
@@ -87,4 +89,12 @@ public class SessionInfo {
     }
   }
 
+  public TableDescriptor getTableDescriptor() {
+    return tableDescriptor;
+  }
+
+  public void setTableDescriptor(TableDescriptor tableDescriptor) {
+    this.tableDescriptor = tableDescriptor;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java
new file mode 100644
index 0000000..e1628e7
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/TableLayout.java
@@ -0,0 +1,11 @@
+package org.apache.blur.thrift;
+
+public abstract class TableLayout {
+
+  public enum TYPE {
+    WRITABLE, ONLINE, BACKUP
+  }
+  
+  public abstract String findServer(String table, int shard, TYPE type);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 79855f4..0fb9828 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -39,6 +39,11 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
 import org.apache.blur.concurrent.ThreadWatcher;
@@ -54,6 +59,7 @@ import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
 import org.apache.blur.manager.indexserver.DistributedIndexServer;
+import org.apache.blur.manager.indexserver.DistributedLayoutManager;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.store.blockcache.BlockCache;
@@ -162,7 +168,7 @@ public class ThriftBlurShardServer extends ThriftServer {
 
     BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
 
-    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(BlurConstants.BLUR_CLUSTER,zooKeeper);
+    final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(BlurConstants.BLUR_CLUSTER,
zooKeeper);
 
     final BlurIndexRefresher refresher = new BlurIndexRefresher();
     refresher.init();
@@ -197,18 +203,44 @@ public class ThriftBlurShardServer extends ThriftServer {
     indexManager.setFilterCache(filterCache);
     indexManager.init();
 
+    TableLayout layout = new TableLayout() {
+      @Override
+      public String findServer(String table, int shard, TYPE type) {
+        DistributedLayoutManager manager = new DistributedLayoutManager();
+        List<String> onlineServers = clusterStatus.getOnlineServers(true);
+        List<String> offlineServers = clusterStatus.getOfflineServers(true);
+        manager.setNodes(onlineServers);
+        manager.setNodesOffline(offlineServers);
+        manager.setShards(getShardList(clusterStatus.getTableDescriptor(true, table).getShardCount()));
+        manager.init();
+
+        Map<String, String> map = manager.getLayout();
+        String server = map.get(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shard));
+        return server;
+      }
+
+      private Collection<String> getShardList(int shardCount) {
+        List<String> list = new ArrayList<String>();
+        for (int i = 0; i < shardCount; i++) {
+          list.add(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
+        }
+        return list;
+      }
+    };
+
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setIndexServer(indexServer);
     shardServer.setIndexManager(indexManager);
     shardServer.setZookeeper(zooKeeper);
     shardServer.setClusterStatus(clusterStatus);
     shardServer.setConfiguration(configuration);
+    shardServer.setLayout(layout);
     shardServer.init();
 
     Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(blurMetrics, shardServer, Iface.class);
 
     int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
-    
+
     Processor<Iface> processor = new Processor<Iface>(iface);
 
     final ThriftBlurShardServer server = new ThriftBlurShardServer();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index f649f58..777d880 100644
--- a/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -346,8 +346,13 @@ public abstract class MiniCluster {
         // here because of an InterruptedException. Don't let exceptions in
         // here be cause of test failure.
       }
+      FileSystem fs = null;
+      try {
+        fs = cluster.getFileSystem();
+      } catch (IOException e) {
+        // If this errors out then the file system was closed.
+      }
       try {
-        FileSystem fs = cluster.getFileSystem();
         if (fs != null) {
           LOG.info("Shutting down FileSystem");
           fs.close();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 39b428e..de97d6d 100644
--- a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -55,7 +55,9 @@ import org.junit.Test;
 
 public class BlurClusterTest {
 
+  private static final int SHARD_COUNT = 5;
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
+  private static final int SHARD_SERVERS = 2;
 
   @BeforeClass
   public static void startCluster() throws IOException {
@@ -77,7 +79,7 @@ public class BlurClusterTest {
     System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
     testDirectory.delete();
 
-    MiniCluster.startBlurCluster("cluster", 1);
+    MiniCluster.startBlurCluster("cluster", SHARD_SERVERS);
   }
 
   @AfterClass
@@ -90,7 +92,7 @@ public class BlurClusterTest {
     Blur.Iface client = getClient();
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName("test");
-    tableDescriptor.setShardCount(5);
+    tableDescriptor.setShardCount(SHARD_COUNT);
     tableDescriptor.setStoragePath(MiniCluster.getFileSystemUri().toString() + "/blur/test");
     client.createTable(tableDescriptor);
     List<String> tableList = client.tableList();
@@ -98,24 +100,27 @@ public class BlurClusterTest {
   }
 
   private Iface getClient() {
-    return BlurClient.getClient(MiniCluster.getConnectionStr());
+    return BlurClient.getClient(MiniCluster.getConnectionStr(), 0, 0, 0);
   }
 
   @Test
   public void testLoadTable() throws BlurException, TException, InterruptedException, IOException
{
     Iface client = getClient();
-    int length = 100;
-    List<Document> documents = new ArrayList<Document>();
-    for (int i = 0; i < length; i++) {
-      Document doc = new Document();
-      doc.addToFields(new Field("id", ByteBuffer.wrap(UUID.randomUUID().toString().getBytes()),
TYPE.STRING, 1.0));
-      doc.addToFields(new Field("value", ByteBuffer.wrap("test".getBytes()), TYPE.STRING,
1.0));
-      documents.add(doc);
+    for (int s = 0; s < SHARD_COUNT; s++) {
+      int length = 100;
+      List<Document> documents = new ArrayList<Document>();
+      for (int i = 0; i < length; i++) {
+        Document doc = new Document();
+        doc.addToFields(new Field("id", ByteBuffer.wrap(UUID.randomUUID().toString().getBytes()),
TYPE.STRING, 1.0));
+        doc.addToFields(new Field("value", ByteBuffer.wrap("test".getBytes()), TYPE.STRING,
1.0));
+        documents.add(doc);
+      }
+      MutateOptions options = new MutateOptions();
+      options.setTable("test");
+      options.setWaitToBeVisible(true);
+      options.setShardIndex(s);
+      client.addDocuments(options, documents);
     }
-    MutateOptions options = new MutateOptions();
-    options.setTable("test");
-    options.setWaitToBeVisible(true);
-    client.addDocuments(options, documents);
 
     Session session = client.openReadSession("test");
     QueryArgs queryArgs = new QueryArgs();
@@ -123,6 +128,10 @@ public class BlurClusterTest {
     TermQuery query = new TermQuery(term);
     queryArgs.setQuery(Convert.toBytes(query));
     List<TopFieldDocs> results = client.search(session, queryArgs);
-    System.out.println(results);
+    long totalHits = 0;
+    for (TopFieldDocs fieldDocs : results) {
+      totalHits += fieldDocs.getTotalHits();
+    }
+    assertEquals(SHARD_COUNT * 100, totalHits);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java b/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
index 0e21a63..13467af 100644
--- a/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
@@ -34,9 +34,19 @@ public class BlurClient {
   static class BlurClientInvocationHandler implements InvocationHandler {
 
     private List<Connection> connections;
+    private long maxBackOffTime;
+    private long backOffTime;
+    private int maxRetries;
 
     public BlurClientInvocationHandler(List<Connection> connections) {
+      this(connections, BlurClientManager.MAX_RETRIES, BlurClientManager.BACK_OFF_TIME, BlurClientManager.MAX_BACK_OFF_TIME);
+    }
+
+    public BlurClientInvocationHandler(List<Connection> connections, int maxRetries,
long backOffTime, long maxBackOffTime) {
       this.connections = connections;
+      this.maxRetries = maxRetries;
+      this.backOffTime = backOffTime;
+      this.maxBackOffTime = maxBackOffTime;
     }
 
     @Override
@@ -61,7 +71,7 @@ public class BlurClient {
             throw new RuntimeException(targetException);
           }
         }
-      });
+      }, maxRetries, backOffTime, maxBackOffTime);
     }
 
   }
@@ -73,10 +83,11 @@ public class BlurClient {
    * Blur.Iface client = Blur.getClient(&quot;controller1:40010,controller2:40010&quot;);
    * </pre>
    * 
-   * The connectionStr also supports passing a proxy host/port (e.g. a SOCKS proxy configuration):
+   * The connectionStr also supports passing a proxy host/port (e.g. a SOCKS
+   * proxy configuration):
    * 
    * <pre>
-   * Blur.Iface client = Blur.getClient("host1:port/proxyhost1:proxyport");
+   * Blur.Iface client = Blur.getClient(&quot;host1:port/proxyhost1:proxyport&quot;);
    * </pre>
    * 
    * @param connectionStr
@@ -88,12 +99,25 @@ public class BlurClient {
     return getClient(connections);
   }
 
+  public static Iface getClient(String connectionStr, int maxRetries, long backOffTime, long
maxBackOffTime) {
+    List<Connection> connections = BlurClientManager.getConnections(connectionStr);
+    return getClient(connections, maxRetries, backOffTime, maxBackOffTime);
+  }
+
   public static Iface getClient(Connection connection) {
     return getClient(Arrays.asList(connection));
   }
+  
+  public static Iface getClient(Connection connection, int maxRetries, long backOffTime,
long maxBackOffTime) {
+    return getClient(Arrays.asList(connection), maxRetries, backOffTime, maxBackOffTime);
+  }
 
   public static Iface getClient(List<Connection> connections) {
     return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
}, new BlurClientInvocationHandler(connections));
   }
+  
+  public static Iface getClient(List<Connection> connections, int maxRetries, long
backOffTime, long maxBackOffTime) {
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
}, new BlurClientInvocationHandler(connections, maxRetries, backOffTime, maxBackOffTime));
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3d131d29/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index c6da22e..da74d7a 100644
--- a/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -55,9 +55,9 @@ public class BlurClientManager {
   private static final Object NULL = new Object();
 
   private static final Log LOG = LogFactory.getLog(BlurClientManager.class);
-  private static final int MAX_RETRIES = 5;
-  private static final long BACK_OFF_TIME = TimeUnit.MILLISECONDS.toMillis(250);
-  private static final long MAX_BACK_OFF_TIME = TimeUnit.SECONDS.toMillis(10);
+  public static final int MAX_RETRIES = 5;
+  public static final long BACK_OFF_TIME = TimeUnit.MILLISECONDS.toMillis(250);
+  public static final long MAX_BACK_OFF_TIME = TimeUnit.SECONDS.toMillis(10);
   private static final long ONE_SECOND = TimeUnit.SECONDS.toMillis(1);
 
   private static Map<Connection, BlockingQueue<Client>> clientPool = new ConcurrentHashMap<Connection,
BlockingQueue<Client>>();


Mime
View raw message