incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/5] git commit: Adding a batch fetch call.
Date Fri, 01 Nov 2013 19:09:33 GMT
Adding a batch fetch call.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/416833be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/416833be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/416833be

Branch: refs/heads/apache-blur-0.2
Commit: 416833bef7d78bb46218de0ff577a77988c993ba
Parents: 43c3f83
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Nov 1 14:54:51 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Nov 1 14:54:51 2013 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |   26 +
 .../blur/manager/results/LazyBlurResult.java    |    4 +
 .../blur/thrift/BlurControllerServer.java       |  126 +-
 .../org/apache/blur/thrift/BlurShardServer.java |   20 +-
 .../apache/blur/manager/IndexManagerTest.java   |   39 +-
 .../org/apache/blur/thrift/BlurClusterTest.java |   51 +-
 .../org/apache/blur/thrift/generated/Blur.java  | 1737 +++++++++++++++---
 .../src/main/scripts/interface/Blur.thrift      |   11 +
 .../main/scripts/interface/gen-html/Blur.html   |   12 +
 .../main/scripts/interface/gen-html/index.html  |    1 +
 .../org/apache/blur/thrift/generated/Blur.java  | 1737 +++++++++++++++---
 .../src/main/scripts/interface/gen-js/Blur.js   |  621 +++++--
 .../scripts/interface/gen-perl/Blur/Blur.pm     |  545 ++++--
 .../src/main/scripts/interface/gen-rb/blur.rb   |   65 +
 docs/Blur.html                                  |   12 +
 15 files changed, 4061 insertions(+), 946 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/416833be/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 739b92d..1d833dd 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -203,6 +203,32 @@ public class IndexManager {
     }
   }
 
+  public List<FetchResult> fetchRowBatch(final String table, List<Selector> selectors)
throws BlurException {
+    List<Future<FetchResult>> futures = new ArrayList<Future<FetchResult>>();
+    for (Selector s : selectors) {
+      final Selector selector = s;
+      futures.add(_executor.submit(new Callable<FetchResult>() {
+        @Override
+        public FetchResult call() throws Exception {
+          FetchResult fetchResult = new FetchResult();
+          fetchRow(table, selector, fetchResult);
+          return fetchResult;
+        }
+      }));
+    }
+    List<FetchResult> results = new ArrayList<FetchResult>();
+    for (Future<FetchResult> future : futures) {
+      try {
+        results.add(future.get());
+      } catch (InterruptedException e) {
+        throw new BException("Unkown error while fetching batch table [{0}] selectors [{1}].",
e, table, selectors);
+      } catch (ExecutionException e) {
+        throw new BException("Unkown error while fetching batch table [{0}] selectors [{1}].",
e.getCause(), table, selectors);
+      }
+    }
+    return results;
+  }
+
   public void fetchRow(String table, Selector selector, FetchResult fetchResult) throws BlurException
{
     validSelector(selector);
     BlurIndex index;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/416833be/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java b/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
index f98fae3..17f1edd 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
@@ -38,6 +38,10 @@ public class LazyBlurResult extends BlurResult {
     _client = client;
   }
 
+  public Client getClient() {
+    return _client;
+  }
+
   public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException
{
     synchronized (_client) {
       return _client.fetchRow(table, selector);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/416833be/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 158c8e8..55da371 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
@@ -65,6 +67,7 @@ import org.apache.blur.thrift.generated.BlurQueryStatus;
 import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.ErrorType;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.HighlightOptions;
 import org.apache.blur.thrift.generated.Query;
@@ -284,7 +287,8 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     try {
       String controllerPath = ZookeeperPathConstants.getControllersPath() + "/" + _nodeName;
       if (_zookeeper.exists(controllerPath, false) == null) {
-        //Don't set the version for the registered nodes but only to the online nodes.
+        // Don't set the version for the registered nodes but only to the online
+        // nodes.
         _zookeeper.create(controllerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
       }
     } catch (KeeperException e) {
@@ -292,7 +296,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    
+
     // Wait for other instances (named the same name) to die
     try {
       String version = BlurUtil.getVersion();
@@ -436,22 +440,56 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
       results.facetCounts = BlurUtil.toList(facetCounts);
     }
     if (selector != null) {
-      List<Future<FetchResult>> futures = new ArrayList<Future<FetchResult>>();
+
+      //Gather client objects and build batches for fetching.
+      IdentityHashMap<Client, List<Selector>> map = new IdentityHashMap<Client,
List<Selector>>();
+      
+      //Need to maintain original order.
+      final IdentityHashMap<Selector, Integer> indexMap = new IdentityHashMap<Selector,
Integer>();
       for (int i = 0; i < results.results.size(); i++) {
         final LazyBlurResult result = (LazyBlurResult) results.results.get(i);
-        final Selector s = new Selector(selector);
+        Client client = result.getClient();
+        Selector s = new Selector(selector);
         s.setLocationId(result.locationId);
-        futures.add(executor.submit(new Callable<FetchResult>() {
+        List<Selector> list = map.get(client);
+        if (list == null) {
+          list = new ArrayList<Selector>();
+          map.put(client, list);
+        }
+        list.add(s);
+        indexMap.put(s, i);
+      }
+
+      //Execute batch fetches
+      List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+      final AtomicReferenceArray<FetchResult> fetchResults = new AtomicReferenceArray<FetchResult>(
+          results.results.size());
+      for (Entry<Client, List<Selector>> entry : map.entrySet()) {
+        final Client client = entry.getKey();
+        final List<Selector> list = entry.getValue();
+        futures.add(executor.submit(new Callable<Boolean>() {
           @Override
-          public FetchResult call() throws Exception {
-            return result.fetchRow(table, s);
+          public Boolean call() throws Exception {
+            List<FetchResult> fetchRowBatch = client.fetchRowBatch(table, list);
+            for (int i = 0; i < list.size(); i++) {
+              int index = indexMap.get(list.get(i));
+              fetchResults.set(index, fetchRowBatch.get(i));
+            }
+            return Boolean.TRUE;
           }
         }));
       }
-      for (int i = 0; i < results.results.size(); i++) {
-        Future<FetchResult> future = futures.get(i);
+
+      //Wait for all parallel calls to finish.
+      for (Future<Boolean> future : futures) {
+        future.get();
+      }
+
+      //Place fetch results into result object for response.
+      for (int i = 0; i < fetchResults.length(); i++) {
+        FetchResult fetchResult = fetchResults.get(i);
         BlurResult result = results.results.get(i);
-        result.setFetchResult(future.get());
+        result.setFetchResult(fetchResult);
         result.setLocationId(null);
       }
     }
@@ -494,6 +532,74 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   }
 
   @Override
+  public List<FetchResult> fetchRowBatch(final String table, List<Selector> selectors)
throws BlurException, TException {
+    checkTable(table);
+    Map<String, List<Selector>> selectorBatches = new HashMap<String, List<Selector>>();
+    final Map<String, List<Integer>> selectorBatchesIndexes = new HashMap<String,
List<Integer>>();
+    int i = 0;
+    for (Selector selector : selectors) {
+      checkSelectorFetchSize(selector);
+      IndexManager.validSelector(selector);
+      String clientHostnamePort = getNode(table, selector);
+      List<Selector> list = selectorBatches.get(clientHostnamePort);
+      List<Integer> indexes = selectorBatchesIndexes.get(clientHostnamePort);
+      if (list == null) {
+        if (indexes != null) {
+          throw new BlurException("This should never happen,", null, ErrorType.UNKNOWN);
+        }
+        list = new ArrayList<Selector>();
+        indexes = new ArrayList<Integer>();
+        selectorBatches.put(clientHostnamePort, list);
+        selectorBatchesIndexes.put(clientHostnamePort, indexes);
+      }
+      list.add(selector);
+      indexes.add(i);
+      i++;
+    }
+
+    List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+    final AtomicReferenceArray<FetchResult> fetchResults = new AtomicReferenceArray<FetchResult>(new
FetchResult[i]);
+    for (Entry<String, List<Selector>> batch : selectorBatches.entrySet()) {
+      final String clientHostnamePort = batch.getKey();
+      final List<Selector> list = batch.getValue();
+      futures.add(_executor.submit(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          List<FetchResult> fetchResultList = _client.execute(clientHostnamePort, new
BlurCommand<List<FetchResult>>() {
+            @Override
+            public List<FetchResult> call(Client client) throws BlurException, TException
{
+              return client.fetchRowBatch(table, list);
+            }
+          }, _maxFetchRetries, _fetchDelay, _maxFetchDelay);
+          List<Integer> indexes = selectorBatchesIndexes.get(clientHostnamePort);
+          for (int i = 0; i < fetchResultList.size(); i++) {
+            int index = indexes.get(i);
+            fetchResults.set(index, fetchResultList.get(i));
+          }
+          return Boolean.TRUE;
+        }
+      }));
+    }
+
+    for (Future<Boolean> future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        throw new BException("Unknown error during fetching of batch", e);
+      } catch (ExecutionException e) {
+        throw new BException("Unknown error during fetching of batch", e.getCause());
+      }
+    }
+
+    List<FetchResult> batchResult = new ArrayList<FetchResult>();
+    for (int c = 0; c < fetchResults.length(); c++) {
+      FetchResult fetchResult = fetchResults.get(c);
+      batchResult.add(fetchResult);
+    }
+    return batchResult;
+  }
+
+  @Override
   public void cancelQuery(final String table, final String uuid) throws BlurException, TException
{
     checkTable(table);
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/416833be/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 347d5e4..3189ec4 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -164,6 +164,20 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
+  public List<FetchResult> fetchRowBatch(String table, List<Selector> selectors)
throws BlurException, TException {
+    checkTable(_cluster, table);
+    for (Selector selector : selectors) {
+      checkSelectorFetchSize(selector);
+    }
+    try {
+      return _indexManager.fetchRowBatch(table, selectors);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get fetch row [table={0},selector={1}]", e,
table, selectors);
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
   public void cancelQuery(String table, String uuid) throws BlurException, TException {
     checkTable(_cluster, table);
     resetSearchers();
@@ -398,7 +412,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
       throw new BException(e.getMessage(), e);
     }
   }
-  
+
   @Override
   public void createSnapshot(final String table, final String name) throws BlurException,
TException {
     try {
@@ -415,7 +429,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
       throw new BException(e.getMessage(), e);
     }
   }
-  
+
   @Override
   public void removeSnapshot(final String table, final String name) throws BlurException,
TException {
     try {
@@ -432,7 +446,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
       throw new BException(e.getMessage(), e);
     }
   }
-  
+
   @Override
   public Map<String, List<String>> listSnapshots(final String table) throws BlurException,
TException {
     Map<String, List<String>> snapshots = new HashMap<String, List<String>>();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/416833be/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index fd4e287..fd5f266 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -38,6 +38,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +107,8 @@ public class IndexManagerTest {
 
     BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
     long statusCleanupTimerDelay = 1000;
-    indexManager = new IndexManager(server,getClusterStatus(tableDescriptor),filterCache,10000000,100,1,1,statusCleanupTimerDelay);
+    indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache,
10000000, 100, 1, 1,
+        statusCleanupTimerDelay);
     setupData();
   }
 
@@ -296,7 +298,7 @@ public class IndexManagerTest {
     row.recordCount = 3;
     assertEquals(row, fetchResult.rowResult.row);
   }
-  
+
   @Test
   public void testFetchRowByRowIdHighlightingWithFullText() throws Exception {
     Selector selector = new Selector().setRowId("row-6");
@@ -313,7 +315,7 @@ public class IndexManagerTest {
     row.recordCount = 3;
     assertEquals(row, fetchResult.rowResult.row);
   }
-  
+
   @Test
   public void testFetchRowByRowIdHighlightingWithFullTextWildCard() throws Exception {
     Selector selector = new Selector().setRowId("row-6");
@@ -521,6 +523,33 @@ public class IndexManagerTest {
   }
 
   @Test
+  public void testFetchRowByRowIdBatch() throws Exception {
+    List<Selector> selectors = new ArrayList<Selector>();
+    selectors.add(new Selector().setRowId("row-1"));
+    selectors.add(new Selector().setRowId("row-2"));
+    List<FetchResult> fetchRowBatch = indexManager.fetchRowBatch(TABLE, selectors);
+    assertEquals(2, fetchRowBatch.size());
+    FetchResult fetchResult1 = fetchRowBatch.get(0);
+    assertNotNull(fetchResult1.rowResult.row);
+    Row row1 = newRow(
+        "row-1",
+        newRecord(FAMILY, "record-1", newColumn("testcol1", "value1"), newColumn("testcol2",
"value2"),
+            newColumn("testcol3", "value3")));
+    row1.recordCount = 1;
+    assertEquals(row1, fetchResult1.rowResult.row);
+
+    FetchResult fetchResult2 = fetchRowBatch.get(1);
+    assertNotNull(fetchResult2.rowResult.row);
+    Row row2 = newRow(
+        "row-2",
+        newRecord(FAMILY, "record-2", newColumn("testcol1", "value4"), newColumn("testcol2",
"value5"),
+            newColumn("testcol3", "value6")),
+        newRecord(FAMILY, "record-2B", newColumn("testcol2", "value234123"), newColumn("testcol3",
"value234123")));
+    row2.recordCount = 2;
+    assertEquals(row2, fetchResult2.rowResult.row);
+  }
+
+  @Test
   public void testFetchRowByRowIdPaging() throws Exception {
     Selector selector = new Selector().setRowId("row-6").setStartRecord(0).setMaxRecordsToFetch(1);
     FetchResult fetchResult = new FetchResult();
@@ -818,7 +847,7 @@ public class IndexManagerTest {
     assertNotNull("Non-existent fields should not return null.", terms);
     assertEquals("The terms of non-existent fields should be empty.", 0, terms.size());
   }
-  
+
   @Test
   public void testMutationReplaceRow() throws Exception {
     RowMutation mutation = newRowMutation(
@@ -1014,7 +1043,7 @@ public class IndexManagerTest {
     }
     assertTrue("column 3 should be unmodified", foundUnmodifiedColumn);
   }
-  
+
   @Test
   public void testMutationUpdateRowReplaceExistingColumnsWhileDeletingAColumn() throws Exception
{
     Column c1 = newColumn("testcol1", "value999");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/416833be/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 7fba16b..eb96e78 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -46,10 +46,12 @@ import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ErrorType;
+import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.RecordMutation;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
+import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.util.BlurThriftHelper;
 import org.apache.blur.utils.GCWatcher;
@@ -107,6 +109,8 @@ public class BlurClusterTest {
       KeeperException {
     testCreateTable();
     testLoadTable();
+    testQueryWithSelector();
+    testBatchFetch();
     testQueryCancel();
     testBackPressureViaQuery();
     testTestShardFailover();
@@ -163,6 +167,49 @@ public class BlurClusterTest {
     Schema schema = client.schema("test");
     assertFalse(schema.getFamilies().isEmpty());
   }
+  
+  private void testQueryWithSelector() throws BlurException, TException {
+    Iface client = getClient();
+    BlurQuery blurQueryRow = new BlurQuery();
+    Query queryRow = new Query();
+    queryRow.setQuery("test.test:value");
+    blurQueryRow.setQuery(queryRow);
+    blurQueryRow.setUseCacheIfPresent(false);
+    blurQueryRow.setCacheResult(false);
+    blurQueryRow.setSelector(new Selector());
+    
+    BlurResults resultsRow = client.query("test", blurQueryRow);
+//    assertRowResults(resultsRow);
+    assertEquals(100, resultsRow.getTotalResults());
+    
+    for (BlurResult blurResult : resultsRow.getResults()) {
+      System.out.println(blurResult);
+    }
+    
+    System.out.println();
+  }
+
+  public void testBatchFetch() throws BlurException, TException {
+    final Iface client = getClient();
+    List<String> terms = client.terms("test", null, "rowid", "", (short) 100);
+
+    List<Selector> selectors = new ArrayList<Selector>();
+    for (String s : terms) {
+      Selector selector = new Selector();
+      selector.setRowId(s);
+      selectors.add(selector);
+    }
+
+    List<FetchResult> fetchRowBatch = client.fetchRowBatch("test", selectors);
+    assertEquals(100, fetchRowBatch.size());
+
+    int i = 0;
+    for (FetchResult fetchResult : fetchRowBatch) {
+      assertEquals(terms.get(i), fetchResult.getRowResult().getRow().getId());
+      i++;
+    }
+
+  }
 
   public void testQueryCancel() throws BlurException, TException, InterruptedException {
     // This will make each collect in the collectors pause 250 ms per collect
@@ -317,12 +364,12 @@ public class BlurClusterTest {
 
   public void testTestShardFailover() throws BlurException, TException, InterruptedException,
IOException,
       KeeperException {
-    
+
     System.out.println("===========================");
     System.out.println("===========================");
     System.out.println("===========================");
     System.out.println("===========================");
-    
+
     Iface client = getClient();
     int length = 100;
     BlurQuery blurQuery = new BlurQuery();


Mime
View raw message