incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Fixes BLUR-79 BLUR-102 BLUR-125
Date Tue, 04 Jun 2013 02:13:38 GMT
Updated Branches:
  refs/heads/0.1.5 688b8c681 -> 9e31fcc64


Fixes BLUR-79 BLUR-102 BLUR-125


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/70a6d449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/70a6d449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/70a6d449

Branch: refs/heads/0.1.5
Commit: 70a6d449df0580f538a123a50b0a46191c5759b8
Parents: bfd3887
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jun 3 22:11:33 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jun 3 22:11:33 2013 -0400

----------------------------------------------------------------------
 .../java/org/apache/blur/manager/IndexManager.java |   43 +++++--
 .../blur/manager/results/BlurResultIterable.java   |    7 +-
 .../manager/results/BlurResultIterableClient.java  |   48 +++++--
 .../results/BlurResultIterableMultiple.java        |    7 +-
 .../blur/manager/results/LazyBlurResult.java       |   47 +++++++
 .../blur/server/ShardServerEventHandler.java       |    8 +-
 .../apache/blur/thrift/BlurControllerServer.java   |  108 ++++++++++++---
 .../org/apache/blur/utils/BlurThriftRecord.java    |    2 +-
 .../main/java/org/apache/blur/utils/BlurUtil.java  |    7 +
 .../org/apache/blur/utils/RowDocumentUtil.java     |   11 +-
 .../org/apache/blur/thrift/BlurClusterTest.java    |   72 ++++++++---
 .../org/apache/blur/thrift/AbstractCommand.java    |   62 ++++++++-
 .../org/apache/blur/thrift/BlurClientManager.java  |   41 ++++--
 .../apache/blur/thrift/commands/BlurCommand.java   |    6 +-
 14 files changed, 373 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index b21e3e4..2a08151 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -19,10 +19,10 @@ package org.apache.blur.manager;
 import static org.apache.blur.metrics.MetricsConstants.BLUR;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 import static org.apache.blur.thrift.util.BlurThriftHelper.findRecordMutation;
-import static org.apache.blur.utils.BlurConstants.PRIME_DOC;
+import static org.apache.blur.utils.BlurConstants.*;
 import static org.apache.blur.utils.BlurConstants.RECORD_ID;
 import static org.apache.blur.utils.BlurConstants.ROW_ID;
-import static org.apache.blur.utils.RowDocumentUtil.getColumns;
+import static org.apache.blur.utils.RowDocumentUtil.getRecord;
 import static org.apache.blur.utils.RowDocumentUtil.getRow;
 
 import java.io.IOException;
@@ -174,7 +174,7 @@ public class IndexManager {
     String shard;
     try {
       if (selector.getLocationId() == null) {
-        //Not looking up by location id so we should resetSearchers.
+        // Not looking up by location id so we should resetSearchers.
         ShardServerContext.resetSearchers();
         populateSelector(table, selector);
       }
@@ -447,7 +447,15 @@ public class IndexManager {
     if (docId >= reader.maxDoc()) {
       throw new RuntimeException("Location id [" + locationId + "] with docId [" + docId
+ "] is not valid.");
     }
+
+    boolean returnIdsOnly = false;
+    if (selector.columnFamiliesToFetch.isEmpty() && selector.columnsToFetch.isEmpty())
{
+      // exit early
+      returnIdsOnly = true;
+    }
+
     Bits liveDocs = MultiFields.getLiveDocs(reader);
+    ResetableDocumentStoredFieldVisitor fieldVisitor = getFieldSelector(selector);
     if (selector.isRecordOnly()) {
       // select only the row for the given data or location id.
       if (liveDocs != null && !liveDocs.get(docId)) {
@@ -457,9 +465,10 @@ public class IndexManager {
       } else {
         fetchResult.exists = true;
         fetchResult.deleted = false;
-        reader.document(docId, getFieldSelector(selector));
-        Document document = reader.document(docId);
-        fetchResult.recordResult = getColumns(document);
+        reader.document(docId, fieldVisitor);
+        Document document = fieldVisitor.getDocument();
+        fieldVisitor.reset();
+        fetchResult.recordResult = getRecord(document);
         return;
       }
     } else {
@@ -471,9 +480,16 @@ public class IndexManager {
         fetchResult.exists = true;
         fetchResult.deleted = false;
         String rowId = getRowId(reader, docId);
-        List<Document> docs = BlurUtil.fetchDocuments(reader, new Term(ROW_ID, rowId),
getFieldSelector(selector),
-            selector);
-        fetchResult.rowResult = new FetchRowResult(getRow(docs));
+        Term term = new Term(ROW_ID, rowId);
+        if (returnIdsOnly) {
+          int recordCount = BlurUtil.countDocuments(reader, term);
+          fetchResult.rowResult = new FetchRowResult();
+          fetchResult.rowResult.row = new Row(rowId, null, recordCount);
+        } else {
+          List<Document> docs = BlurUtil.fetchDocuments(reader, term, fieldVisitor,
+              selector);
+          fetchResult.rowResult = new FetchRowResult(getRow(docs));
+        }
         return;
       }
     }
@@ -513,6 +529,9 @@ public class IndexManager {
         if (PRIME_DOC.equals(fieldInfo.name)) {
           return StoredFieldVisitor.Status.NO;
         }
+        if (FAMILY.equals(fieldInfo.name)) {
+          return StoredFieldVisitor.Status.YES;
+        }
         if (selector.columnFamiliesToFetch == null && selector.columnsToFetch ==
null) {
           return StoredFieldVisitor.Status.YES;
         }
@@ -993,8 +1012,10 @@ public class IndexManager {
         searcher.setSimilarity(_indexServer.getSimilarity(_table));
         Query rewrite = searcher.rewrite((Query) _query.clone());
 
-        // BlurResultIterableSearcher will close searcher, if shard server context is null.
-        return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher,
_selector, _shardServerContext == null);
+        // BlurResultIterableSearcher will close searcher, if shard server
+        // context is null.
+        return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher,
_selector,
+            _shardServerContext == null);
       } finally {
         _queriesInternalMeter.mark();
         _status.deattachThread();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
index 8514ae2..517920a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
@@ -16,13 +16,12 @@ package org.apache.blur.manager.results;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import java.io.IOException;
+import java.io.Closeable;
 import java.util.Map;
 
 import org.apache.blur.thrift.generated.BlurResult;
 
-
-public interface BlurResultIterable extends Iterable<BlurResult> {
+public interface BlurResultIterable extends Iterable<BlurResult>, Closeable {
 
   void skipTo(long skipTo);
 
@@ -30,6 +29,4 @@ public interface BlurResultIterable extends Iterable<BlurResult> {
 
   Map<String, Long> getShardInfo();
 
-  void close() throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
index d6f62a0..9cf7dd7 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
@@ -25,30 +25,35 @@ import java.util.concurrent.atomic.AtomicLongArray;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.Connection;
 import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.Blur.Client;
 
-
 public class BlurResultIterableClient implements BlurResultIterable {
 
   private static final Log LOG = LogFactory.getLog(BlurResultIterableClient.class);
 
-  private Map<String, Long> _shardInfo = new TreeMap<String, Long>();
-  private Client _client;
-  private String _table;
+  private final Map<String, Long> _shardInfo = new TreeMap<String, Long>();
+  private final Client _client;
+  private final String _table;
+  private final BlurQuery _originalQuery;
+  private final Connection _connection;
+  private final int _remoteFetchCount;
+  private final AtomicLongArray _facetCounts;
+
   private BlurResults _results;
-  private int _remoteFetchCount;
   private int _batch = 0;
   private long _totalResults;
   private long _skipTo;
-  private AtomicLongArray _facetCounts;
   private boolean _alreadyProcessed;
-  private BlurQuery _originalQuery;
 
-  public BlurResultIterableClient(Blur.Client client, String table, BlurQuery query, AtomicLongArray
facetCounts, int remoteFetchCount) {
+  public BlurResultIterableClient(Connection connection, Blur.Client client, String table,
BlurQuery query,
+      AtomicLongArray facetCounts, int remoteFetchCount) {
+    _connection = connection;
     _client = client;
     _table = table;
     _facetCounts = facetCounts;
@@ -57,13 +62,19 @@ public class BlurResultIterableClient implements BlurResultIterable {
     performSearch();
   }
 
+  public Client getClient() {
+    return _client;
+  }
+
   private void performSearch() {
     try {
       long cursor = _remoteFetchCount * _batch;
-      BlurQuery blurQuery = new BlurQuery(_originalQuery.simpleQuery, _originalQuery.expertQuery,
_originalQuery.facets, null, false, _originalQuery.useCacheIfPresent, cursor,
-          _remoteFetchCount, _originalQuery.minimumNumberOfResults, _originalQuery.maxQueryTime,
_originalQuery.uuid, _originalQuery.userContext, _originalQuery.cacheResult,
-          _originalQuery.startTime, _originalQuery.modifyFileCaches);
-      _results = _client.query(_table, blurQuery);
+      BlurQuery blurQuery = new BlurQuery(_originalQuery.simpleQuery, _originalQuery.expertQuery,
+          _originalQuery.facets, null, false, _originalQuery.useCacheIfPresent, cursor, _remoteFetchCount,
+          _originalQuery.minimumNumberOfResults, _originalQuery.maxQueryTime, _originalQuery.uuid,
+          _originalQuery.userContext, _originalQuery.cacheResult, _originalQuery.startTime,
+          _originalQuery.modifyFileCaches);
+      _results = makeLazy(_client.query(_table, blurQuery));
       addFacets();
       _totalResults = _results.totalResults;
       _shardInfo.putAll(_results.shardInfo);
@@ -74,6 +85,17 @@ public class BlurResultIterableClient implements BlurResultIterable {
     }
   }
 
+  private BlurResults makeLazy(BlurResults results) {
+    List<BlurResult> list = results.results;
+    for (int i = 0; i < list.size(); i++) {
+      BlurResult blurResult = list.get(i);
+      if (blurResult != null) {
+        list.set(i, new LazyBlurResult(blurResult, _client));
+      }
+    }
+    return results;
+  }
+
   private void addFacets() {
     if (!_alreadyProcessed) {
       List<Long> counts = _results.facetCounts;
@@ -144,6 +166,6 @@ public class BlurResultIterableClient implements BlurResultIterable {
 
   @Override
   public void close() throws IOException {
-    // nothing
+    BlurClientManager.returnClient(_connection, _client);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
index 625c663..1a911c3 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
@@ -24,11 +24,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.io.IOUtils;
 
 
 public class BlurResultIterableMultiple implements BlurResultIterable {
+  
+  private static final Log LOG = LogFactory.getLog(BlurResultIterableMultiple.class);
 
   private long totalResults;
   private Map<String, Long> shardInfo = new TreeMap<String, Long>();
@@ -108,7 +113,7 @@ public class BlurResultIterableMultiple implements BlurResultIterable
{
   @Override
   public void close() throws IOException {
     for (BlurResultIterable it : results) {
-      it.close();
+      IOUtils.cleanup(LOG, it);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
b/src/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
new file mode 100644
index 0000000..f98fae3
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.blur.manager.results;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Selector;
+
+/**
+ * The {@link LazyBlurResult} adds a method to fetch the result with the client
+ * that was used to execute the query.
+ */
+@SuppressWarnings("serial")
+public class LazyBlurResult extends BlurResult {
+
+  private final Client _client;
+
+  public LazyBlurResult(BlurResult result, Client client) {
+    super(result);
+    _client = client;
+  }
+
+  public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException
{
+    synchronized (_client) {
+      return _client.fetchRow(table, selector);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
b/src/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
index fd0018d..d8eb7cb 100644
--- a/src/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
+++ b/src/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java
@@ -33,25 +33,25 @@ public class ShardServerEventHandler implements TServerEventHandler {
 
   @Override
   public void preServe() {
-    LOG.info("preServe");
+    LOG.debug("preServe");
   }
 
   @Override
   public ServerContext createContext(TProtocol input, TProtocol output) {
-    LOG.info("Client connected");
+    LOG.debug("Client connected");
     return new ShardServerContext();
   }
 
   @Override
   public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output)
{
-    LOG.info("Client disconnected");
+    LOG.debug("Client disconnected");
     ShardServerContext context = (ShardServerContext) serverContext;
     context.close();
   }
 
   @Override
   public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport
outputTransport) {
-    LOG.info("Method called");
+    LOG.debug("Method called");
     ShardServerContext context = (ShardServerContext) serverContext;
     ShardServerContext.registerContextForCall(context);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 79c9b61..20837b3 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -22,13 +22,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +49,7 @@ import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
 import org.apache.blur.manager.indexserver.DistributedLayoutManager;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.manager.results.BlurResultIterableClient;
+import org.apache.blur.manager.results.LazyBlurResult;
 import org.apache.blur.manager.results.MergerBlurResultIterable;
 import org.apache.blur.manager.stats.MergerTableStats;
 import org.apache.blur.manager.status.MergerQueryStatus;
@@ -57,6 +61,7 @@ import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.RowMutation;
@@ -83,19 +88,21 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
 
   public static abstract class BlurClient {
     public abstract <T> T execute(String node, BlurCommand<T> command, int maxRetries,
long backOffTime,
-        long maxBackOffTime) throws Exception;
+        long maxBackOffTime) throws BlurException, TException, IOException;
   }
 
   public static class BlurClientRemote extends BlurClient {
     @Override
     public <T> T execute(String node, BlurCommand<T> command, int maxRetries,
long backOffTime, long maxBackOffTime)
-        throws Exception {
+        throws BlurException, TException, IOException {
       return BlurClientManager.execute(node, command, maxRetries, backOffTime, maxBackOffTime);
     }
   }
 
   private static final String CONTROLLER_THREAD_POOL = "controller-thread-pool";
   private static final Log LOG = LogFactory.getLog(BlurControllerServer.class);
+  private static final Map<String, Set<String>> EMPTY_MAP = new HashMap<String,
Set<String>>();
+  private static final Set<String> EMPTY_SET = new HashSet<String>();
 
   private ExecutorService _executor;
   private AtomicReference<Map<String, Map<String, String>>> _shardServerLayout
= new AtomicReference<Map<String, Map<String, String>>>(
@@ -292,7 +299,6 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
 
   @Override
   public BlurResults query(final String table, final BlurQuery blurQuery) throws BlurException,
TException {
-    // @TODO make this faster
     checkTable(table);
     String cluster = _clusterStatus.getCluster(true, table);
     _queryChecker.checkQuery(blurQuery);
@@ -303,25 +309,49 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
         final AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
 
         BlurQuery original = new BlurQuery(blurQuery);
-        
+
         BlurUtil.setStartTime(original);
 
         Selector selector = blurQuery.getSelector();
+        if (selector == null) {
+          selector = new Selector();
+          selector.setColumnFamiliesToFetch(EMPTY_SET);
+          selector.setColumnsToFetch(EMPTY_MAP);
+          if (!blurQuery.simpleQuery.superQueryOn) {
+            selector.setRecordOnly(true);
+          }
+        }
         blurQuery.setSelector(null);
 
-        BlurResultIterable hitsIterable = scatterGather(getCluster(table), new BlurCommand<BlurResultIterable>()
{
+        BlurCommand<BlurResultIterable> command = new BlurCommand<BlurResultIterable>()
{
+          @Override
+          public BlurResultIterable call(Client client, Connection connection) throws BlurException,
TException {
+            return new BlurResultIterableClient(connection, client, table, blurQuery, facetCounts,
_remoteFetchCount);
+          }
+
           @Override
           public BlurResultIterable call(Client client) throws BlurException, TException
{
-            return new BlurResultIterableClient(client, table, blurQuery, facetCounts, _remoteFetchCount);
+            throw new RuntimeException("Won't be called.");
+          }
+        };
+
+        command.setDetachClient(true);
+
+        MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
+        BlurResultIterable hitsIterable = null;
+        try {
+          hitsIterable = scatterGather(getCluster(table), command, merger);
+          BlurResults results = convertToBlurResults(hitsIterable, blurQuery, facetCounts,
_executor, selector, table);
+          if (!validResults(results, shardCount, blurQuery)) {
+            BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
+            continue OUTER;
+          }
+          return results;
+        } finally {
+          if (hitsIterable != null) {
+            hitsIterable.close();
           }
-        }, new MergerBlurResultIterable(blurQuery));
-        BlurResults results = BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts,
_executor, selector, this,
-            table);
-        if (!validResults(results, shardCount, blurQuery)) {
-          BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
-          continue OUTER;
         }
-        return results;
       } catch (Exception e) {
         LOG.error("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
         throw new BException("Unknown error during search of [table={0},blurQuery={1}]",
e, table, blurQuery);
@@ -330,6 +360,52 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     throw new BlurException("Query could not be completed.", null);
   }
 
+  public BlurResults convertToBlurResults(BlurResultIterable hitsIterable, BlurQuery query,
+      AtomicLongArray facetCounts, ExecutorService executor, Selector selector, final String
table)
+      throws InterruptedException, ExecutionException {
+    BlurResults results = new BlurResults();
+    results.setTotalResults(hitsIterable.getTotalResults());
+    results.setShardInfo(hitsIterable.getShardInfo());
+    if (query.minimumNumberOfResults > 0) {
+      hitsIterable.skipTo(query.start);
+      int count = 0;
+      Iterator<BlurResult> iterator = hitsIterable.iterator();
+      while (iterator.hasNext() && count < query.fetch) {
+        results.addToResults(iterator.next());
+        count++;
+      }
+    }
+    if (results.results == null) {
+      results.results = new ArrayList<BlurResult>();
+    }
+    if (facetCounts != null) {
+      results.facetCounts = BlurUtil.toList(facetCounts);
+    }
+    if (selector != null) {
+      List<Future<FetchResult>> futures = new ArrayList<Future<FetchResult>>();
+      for (int i = 0; i < results.results.size(); i++) {
+        final LazyBlurResult result = (LazyBlurResult) results.results.get(i);
+        final Selector s = new Selector(selector);
+        s.setLocationId(result.locationId);
+        futures.add(executor.submit(new Callable<FetchResult>() {
+          @Override
+          public FetchResult call() throws Exception {
+            return result.fetchRow(table, s);
+          }
+        }));
+      }
+      for (int i = 0; i < results.results.size(); i++) {
+        Future<FetchResult> future = futures.get(i);
+        BlurResult result = results.results.get(i);
+        result.setFetchResult(future.get());
+        result.setLocationId(null);
+      }
+    }
+    results.query = query;
+    results.query.selector = selector;
+    return results;
+  }
+
   private boolean validResults(BlurResults results, int shardCount, BlurQuery query) {
     if (results.totalResults >= query.minimumNumberOfResults) {
       return true;
@@ -618,11 +694,9 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   private <R> R scatterGather(String cluster, final BlurCommand<R> command, Merger<R>
merger) throws Exception {
     return ForkJoin.execute(_executor, _clusterStatus.getOnlineShardServers(true, cluster),
         new ParallelCall<String, R>() {
-          @SuppressWarnings("unchecked")
           @Override
-          public R call(String hostnamePort) throws Exception {
-            return _client.execute(hostnamePort, (BlurCommand<R>) command.clone(),
_maxDefaultRetries, _defaultDelay,
-                _maxDefaultDelay);
+          public R call(String hostnamePort) throws BlurException, TException, IOException
{
+            return _client.execute(hostnamePort, command.clone(), _maxDefaultRetries, _defaultDelay,
_maxDefaultDelay);
           }
         }).merge(merger);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
index d8dcefa..6323af5 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurThriftRecord.java
@@ -40,7 +40,7 @@ public class BlurThriftRecord extends Record implements ReaderBlurRecord
{
 
   @Override
   public void setRowIdStr(String rowId) {
-    // setRowIdStr(rowId);
+    // do nothing
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index 2323eae..a421a2c 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -574,6 +574,13 @@ public class BlurUtil {
     recordMutation.setRecordMutationType(RecordMutationType.REPLACE_ENTIRE_RECORD);
     return recordMutation;
   }
+  
+  public static int countDocuments(IndexReader reader, Term term) throws IOException {
+    TermQuery query = new TermQuery(term);
+    IndexSearcher indexSearcher = new IndexSearcher(reader);
+    TopDocs topDocs = indexSearcher.search(query, 1);
+    return topDocs.totalHits;
+  }
 
   /**
    * NOTE: This is a potentially dangerous call, it will return all the

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
index 34f1955..4f09fe2 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
@@ -16,7 +16,7 @@ package org.apache.blur.utils;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.utils.BlurConstants.RECORD_ID;
+import static org.apache.blur.utils.BlurConstants.*;
 import static org.apache.blur.utils.BlurConstants.ROW_ID;
 import static org.apache.blur.utils.BlurConstants.SEP;
 
@@ -31,7 +31,7 @@ import org.apache.lucene.index.IndexableField;
 
 public class RowDocumentUtil {
 
-  public static FetchRecordResult getColumns(Document document) {
+  public static FetchRecordResult getRecord(Document document) {
     FetchRecordResult result = new FetchRecordResult();
     BlurThriftRecord record = new BlurThriftRecord();
     String rowId = readRecord(document, record);
@@ -69,26 +69,23 @@ public class RowDocumentUtil {
 
   public static String readRecord(Document document, ReaderBlurRecord reader) {
     String rowId = null;
-    String family = null;
     for (IndexableField field : document.getFields()) {
       if (field.name().equals(ROW_ID)) {
         rowId = field.stringValue();
       } else if (field.name().equals(RECORD_ID)) {
         reader.setRecordIdStr(field.stringValue());
+      } else if (field.name().equals(FAMILY)) {
+        reader.setFamilyStr(field.stringValue());
       } else {
         String name = field.name();
         int index = name.indexOf(SEP);
         if (index < 0) {
           continue;
-        } else if (family == null) {
-          family = name.substring(0, index);
         }
         name = name.substring(index + 1);
         reader.addColumn(name, field.stringValue());
       }
     }
-    reader.setFamilyStr(family);
-    reader.setRowIdStr(rowId);
     return rowId;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 5ae4911..38475a8 100644
--- a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -19,6 +19,8 @@ package org.apache.blur.thrift;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import java.io.File;
 import java.io.IOException;
@@ -34,6 +36,7 @@ import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.RecordMutation;
 import org.apache.blur.thrift.generated.RowMutation;
@@ -82,7 +85,7 @@ public class BlurClusterTest {
   public static void shutdownCluster() {
     MiniCluster.shutdownBlurCluster();
   }
-  
+
   private Iface getClient() {
     return BlurClient.getClient(MiniCluster.getControllerConnectionStr());
   }
@@ -106,7 +109,8 @@ public class BlurClusterTest {
     List<RowMutation> mutations = new ArrayList<RowMutation>();
     for (int i = 0; i < length; i++) {
       String rowId = UUID.randomUUID().toString();
-      RecordMutation mutation = BlurThriftHelper.newRecordMutation("test", rowId, BlurThriftHelper.newColumn("test",
"value"));
+      RecordMutation mutation = BlurThriftHelper.newRecordMutation("test", rowId,
+          BlurThriftHelper.newColumn("test", "value"));
       RowMutation rowMutation = BlurThriftHelper.newRowMutation("test", rowId, mutation);
       rowMutation.setWaitToBeVisible(true);
       mutations.add(rowMutation);
@@ -115,16 +119,27 @@ public class BlurClusterTest {
     client.mutateBatch(mutations);
     long e = System.nanoTime();
     System.out.println("mutateBatch took [" + (e - s) / 1000000.0 + "]");
-    BlurQuery blurQuery = new BlurQuery();
-    SimpleQuery simpleQuery = new SimpleQuery();
-    simpleQuery.setQueryStr("test.test:value");
-    blurQuery.setSimpleQuery(simpleQuery);
-    BlurResults results = client.query("test", blurQuery);
-    assertEquals(length, results.getTotalResults());
+    BlurQuery blurQueryRow = new BlurQuery();
+    SimpleQuery simpleQueryRow = new SimpleQuery();
+    simpleQueryRow.setQueryStr("test.test:value");
+    blurQueryRow.setSimpleQuery(simpleQueryRow);
+    BlurResults resultsRow = client.query("test", blurQueryRow);
+    assertRowResults(resultsRow);
+    assertEquals(length, resultsRow.getTotalResults());
+
+    BlurQuery blurQueryRecord = new BlurQuery();
+    SimpleQuery simpleQueryRecord = new SimpleQuery();
+    simpleQueryRecord.superQueryOn = false;
+    simpleQueryRecord.setQueryStr("test.test:value");
+    blurQueryRecord.setSimpleQuery(simpleQueryRecord);
+    BlurResults resultsRecord = client.query("test", blurQueryRecord);
+    assertRecordResults(resultsRecord);
+    assertEquals(length, resultsRecord.getTotalResults());
   }
-  
+
   @Test
-  public void testTestShardFailover() throws BlurException, TException, InterruptedException,
IOException, KeeperException {
+  public void testTestShardFailover() throws BlurException, TException, InterruptedException,
IOException,
+      KeeperException {
     Iface client = getClient();
     int length = 100;
     BlurQuery blurQuery = new BlurQuery();
@@ -134,17 +149,40 @@ public class BlurClusterTest {
     blurQuery.setSimpleQuery(simpleQuery);
     BlurResults results1 = client.query("test", blurQuery);
     assertEquals(length, results1.getTotalResults());
-    
+    assertRowResults(results1);
+
     MiniCluster.killShardServer(1);
-    
-    //make sure the WAL syncs
+
+    // make sure the WAL syncs
     Thread.sleep(TimeUnit.SECONDS.toMillis(1));
-    
-    //This should block until shards have failed over
+
+    // This should block until shards have failed over
     client.shardServerLayout("test");
-    
+
     assertEquals(length, client.query("test", blurQuery).getTotalResults());
-    
+
+  }
+
+  private void assertRowResults(BlurResults results) {
+    for (BlurResult result : results.getResults()) {
+      assertNull(result.locationId);
+      assertNull(result.fetchResult.recordResult);
+      assertNull(result.fetchResult.rowResult.row.records);
+      assertNotNull(result.fetchResult.rowResult.row.id);
+    }
+  }
+
+  private void assertRecordResults(BlurResults results) {
+    for (BlurResult result : results.getResults()) {
+      assertNull(result.locationId);
+      assertNotNull(result.fetchResult.recordResult);
+      assertNotNull(result.fetchResult.recordResult.rowid);
+      assertNotNull(result.fetchResult.recordResult.record.recordId);
+      assertNotNull(result.fetchResult.recordResult.record.family);
+      assertNull("Not null [" + result.fetchResult.recordResult.record.columns + "]",
+          result.fetchResult.recordResult.record.columns);
+      assertNull(result.fetchResult.rowResult);
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java b/src/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
index 0f6ed3f..1a3cc8e 100644
--- a/src/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
@@ -19,12 +19,68 @@ package org.apache.blur.thrift;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.BlurException;
 
-
 public abstract class AbstractCommand<CLIENT, T> implements Cloneable {
+
+  private boolean detachClient = false;
+
+  /**
+   * Reads if this command is to detach the client from the pool or not. If
+   * detach is set to true, then the user of the call needs to return the client
+   * to the pool by calling returnClient on the {@link BlurClientManager}.
+   * 
+   * @return the boolean.
+   */
+  public boolean isDetachClient() {
+    return detachClient;
+  }
+
+  /**
+   * Sets the attribute of detach client.
+   * 
+   * @param detachClient
+   *          the boolean value.
+   */
+  public void setDetachClient(boolean detachClient) {
+    this.detachClient = detachClient;
+  }
+
+  /**
+   * If this method is implemented then the call(CLIENT client) method is not
+   * called. This allows the command to gain access to the {@link Connection}
+   * object that is not normally needed. Usually used in conjunction with the
+   * detachClient attribute.
+   * 
+   * @param client
+   *          the client.
+   * @param connection
+   *          the connection object.
+   * @return object.
+   * @throws BlurException
+   * @throws TException
+   */
+  public T call(CLIENT client, Connection connection) throws BlurException, TException {
+    return call(client);
+  }
+
+  /**
+   * Abstract method that will be executed with a CLIENT object. And it will be
+   * retried if a {@link TException} is throw (that type of exception is assumed
+   * to be a problem with the connection to the remote system).
+   * 
+   * @param client the client.
+   * @return object.
+   * @throws BlurException
+   * @throws TException
+   */
   public abstract T call(CLIENT client) throws BlurException, TException;
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Object clone() throws CloneNotSupportedException {
-    return super.clone();
+  public AbstractCommand<CLIENT, T> clone() {
+    try {
+      return (AbstractCommand<CLIENT, T>) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index 87889ee..4e4c40c 100644
--- a/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -49,7 +49,6 @@ import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Blur.Client;
 
-
 public class BlurClientManager {
 
   private static final Object NULL = new Object();
@@ -111,16 +110,18 @@ public class BlurClientManager {
     return false;
   }
 
-  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT,
T> command) throws BlurException, TException, IOException {
+  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT,
T> command) throws BlurException,
+      TException, IOException {
     return execute(connection, command, MAX_RETRIES, BACK_OFF_TIME, MAX_BACK_OFF_TIME);
   }
 
-  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT,
T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws BlurException,
-      TException, IOException {
+  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT,
T> command, int maxRetries,
+      long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException
{
     return execute(Arrays.asList(connection), command, maxRetries, backOffTime, maxBackOffTime);
   }
 
-  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT,
T> command) throws BlurException, TException, IOException {
+  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT,
T> command)
+      throws BlurException, TException, IOException {
     return execute(connections, command, MAX_RETRIES, BACK_OFF_TIME, MAX_BACK_OFF_TIME);
   }
 
@@ -132,8 +133,8 @@ public class BlurClientManager {
   }
 
   @SuppressWarnings("unchecked")
-  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT,
T> command, int maxRetries, long backOffTime, long maxBackOffTime)
-      throws BlurException, TException, IOException {
+  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT,
T> command, int maxRetries,
+      long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException
{
     LocalResources localResources = new LocalResources();
     AtomicReference<Client> client = localResources.client;
     Random random = localResources.random;
@@ -163,8 +164,13 @@ public class BlurClientManager {
           }
         }
         try {
-          T result = command.call((CLIENT) client.get());
+          T result = command.call((CLIENT) client.get(), connection);
           allBad = false;
+          if (command.isDetachClient()) {
+            // if the is detach client is set then the command will return the
+            // client to the pool.
+            client.set(null);
+          }
           return result;
         } catch (RuntimeException e) {
           Throwable cause = e.getCause();
@@ -210,8 +216,9 @@ public class BlurClientManager {
     return badConnections.containsKey(connection);
   }
 
-  private static <CLIENT, T> boolean handleError(Connection connection, AtomicReference<Blur.Client>
client, AtomicInteger retries, AbstractCommand<CLIENT, T> command,
-      Exception e, int maxRetries, long backOffTime, long maxBackOffTime) {
+  private static <CLIENT, T> boolean handleError(Connection connection, AtomicReference<Blur.Client>
client,
+      AtomicInteger retries, AbstractCommand<CLIENT, T> command, Exception e, int maxRetries,
long backOffTime,
+      long maxBackOffTime) {
     if (client.get() != null) {
       trashConnections(connection, client);
       markBadConnection(connection);
@@ -221,7 +228,8 @@ public class BlurClientManager {
       LOG.error("No more retries [{0}] out of [{1}]", retries, maxRetries);
       return true;
     }
-    LOG.error("Retrying call [{0}] retry [{1}] out of [{2}] message [{3}]", command, retries.get(),
maxRetries, e.getMessage());
+    LOG.error("Retrying call [{0}] retry [{1}] out of [{2}] message [{3}]", command, retries.get(),
maxRetries,
+        e.getMessage());
     sleep(backOffTime, maxBackOffTime, retries.get(), maxRetries);
     retries.incrementAndGet();
     return false;
@@ -238,8 +246,8 @@ public class BlurClientManager {
     }
   }
 
-  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT,
T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws BlurException,
-      TException, IOException {
+  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT,
T> command, int maxRetries,
+      long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException
{
     return execute(getConnections(connectionStr), command, maxRetries, backOffTime, maxBackOffTime);
   }
 
@@ -259,15 +267,16 @@ public class BlurClientManager {
     return Arrays.asList(new Connection(connectionStr));
   }
 
-  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT,
T> command) throws BlurException, TException, IOException {
+  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT,
T> command) throws BlurException,
+      TException, IOException {
     return execute(getConnections(connectionStr), command);
   }
 
-  private static void returnClient(Connection connection, AtomicReference<Blur.Client>
client) {
+  public static void returnClient(Connection connection, AtomicReference<Blur.Client>
client) {
     returnClient(connection, client.get());
   }
 
-  private static void returnClient(Connection connection, Blur.Client client) {
+  public static void returnClient(Connection connection, Blur.Client client) {
     try {
       clientPool.get(connection).put(client);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/70a6d449/src/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
b/src/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
index 16fdca9..72f1f5f 100644
--- a/src/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
@@ -19,7 +19,11 @@ package org.apache.blur.thrift.commands;
 import org.apache.blur.thrift.AbstractCommand;
 import org.apache.blur.thrift.generated.Blur;
 
-
 public abstract class BlurCommand<T> extends AbstractCommand<Blur.Client, T>
{
 
+  @Override
+  public BlurCommand<T> clone() {
+    return (BlurCommand<T>) super.clone();
+  }
+
 }


Mime
View raw message