lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r1596817 [4/5] - in /lucene/dev/branches/lucene5675: ./ dev-tools/ dev-tools/idea/lucene/spatial/ dev-tools/idea/solr/contrib/analysis-extras/ dev-tools/scripts/ lucene/ lucene/codecs/ lucene/codecs/src/java/org/apache/lucene/codecs/memory/...
Date Thu, 22 May 2014 11:38:49 GMT
Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Thu May 22 11:38:47 2014
@@ -56,9 +56,11 @@ import org.apache.solr.common.util.Named
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.stats.Snapshot;
 import org.apache.solr.util.stats.Timer;
 import org.apache.solr.util.stats.TimerContext;
@@ -74,11 +76,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.solr.cloud.Assign.Node;
@@ -118,8 +124,6 @@ public class OverseerCollectionProcessor
 
   public static final String DELETESHARD = "deleteshard";
 
-  public static final String REQUESTSTATUS = "status";
-
   public static final String ROUTER = "router";
 
   public static final String SHARDS_PROP = "shards";
@@ -138,6 +142,8 @@ public class OverseerCollectionProcessor
 
   public static final String COLL_PROP_PREFIX = "property.";
 
+  public int maxParallelThreads = 10;
+
   public static final Set<String> KNOWN_CLUSTER_PROPS = ImmutableSet.of(ZkStateReader.LEGACY_CLOUD, ZkStateReader.URL_SCHEME);
 
   public static final Map<String,Object> COLL_PROPS = ZkNodeProps.makeMap(
@@ -146,8 +152,7 @@ public class OverseerCollectionProcessor
       MAX_SHARDS_PER_NODE, "1" );
 
 
-  // TODO: use from Overseer?
-  private static final String QUEUE_OPERATION = "operation";
+  public ExecutorService tpe ;
   
   private static Logger log = LoggerFactory
       .getLogger(OverseerCollectionProcessor.class);
@@ -156,10 +161,20 @@ public class OverseerCollectionProcessor
   private DistributedMap runningMap;
   private DistributedMap completedMap;
   private DistributedMap failureMap;
-  
+
+  // Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
+  final private Set runningTasks;
+
+  // Set that tracks collections that are currently being processed by a running task.
+  // This is used for handling mutual exclusion of the tasks.
+  final private Set collectionWip;
+
+  // List of completed tasks. This is used to clean up workQueue in zk.
+  final private HashMap<String, QueueEvent> completedTasks;
+
   private String myId;
 
-  private ShardHandler shardHandler;
+  private final ShardHandlerFactory shardHandlerFactory;
 
   private String adminPath;
 
@@ -169,13 +184,22 @@ public class OverseerCollectionProcessor
 
   private Overseer.Stats stats;
 
-  public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, Overseer.Stats stats) {
-    this(zkStateReader, myId, shardHandler, adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
+  // Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
+  // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
+  // deleted from the work-queue as that is a batched operation.
+  final private Set<String> runningZKTasks;
+  private final Object waitLock = new Object();
+
+  public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
+                                     final ShardHandler shardHandler,
+                                     String adminPath, Overseer.Stats stats) {
+    this(zkStateReader, myId, shardHandler.getShardHandlerFactory(), adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
         Overseer.getRunningMap(zkStateReader.getZkClient()),
         Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
   }
 
-  protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
+  protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
+                                        final ShardHandlerFactory shardHandlerFactory,
                                         String adminPath,
                                         Overseer.Stats stats,
                                         DistributedQueue workQueue,
@@ -184,107 +208,199 @@ public class OverseerCollectionProcessor
                                         DistributedMap failureMap) {
     this.zkStateReader = zkStateReader;
     this.myId = myId;
-    this.shardHandler = shardHandler;
+    this.shardHandlerFactory = shardHandlerFactory;
     this.adminPath = adminPath;
     this.workQueue = workQueue;
     this.runningMap = runningMap;
     this.completedMap = completedMap;
     this.failureMap = failureMap;
     this.stats = stats;
+    this.runningZKTasks = new HashSet<>();
+    this.runningTasks = new HashSet();
+    this.collectionWip = new HashSet();
+    this.completedTasks = new HashMap<>();
   }
   
   @Override
   public void run() {
-       log.info("Process current queue of collection creations");
-       LeaderStatus isLeader = amILeader();
-       while (isLeader == LeaderStatus.DONT_KNOW) {
-         log.debug("am_i_leader unclear {}", isLeader);
-         isLeader = amILeader();  // not a no, not a yes, try ask again
-       }
+    log.info("Process current queue of collection creations");
+    LeaderStatus isLeader = amILeader();
+    while (isLeader == LeaderStatus.DONT_KNOW) {
+      log.debug("am_i_leader unclear {}", isLeader);
+      isLeader = amILeader();  // not a no, not a yes, try ask again
+    }
+
+    String oldestItemInWorkQueue = null;
+    // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
+    // This variable is set in case there's any task found on the workQueue when the OCP starts up and
+    // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
+    // Beyond the marker, all tasks can safely be assumed to have never been executed.
+    boolean hasLeftOverItems = true;
+
+    try {
+      oldestItemInWorkQueue = workQueue.getTailId();
+    } catch (KeeperException e) {
+      // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
+      // async calls.
+      SolrException.log(log, "", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    if (oldestItemInWorkQueue == null)
+      hasLeftOverItems = false;
+    else
+      log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
 
     try {
       prioritizeOverseerNodes();
     } catch (Exception e) {
       log.error("Unable to prioritize overseer ", e);
-
     }
-    while (!this.isClosed) {
-         try {
-           isLeader = amILeader();
-           if (LeaderStatus.NO == isLeader) {
-             break;
-           }
-           else if (LeaderStatus.YES != isLeader) {
-             log.debug("am_i_leader unclear {}", isLeader);                  
-             continue; // not a no, not a yes, try asking again
-           }
-           
-           QueueEvent head = workQueue.peek(true);
-           if(isClosed) break;
-           final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
-
-           final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
-
-           try {
-             if(message.containsKey(ASYNC) && message.get(ASYNC) != null && !runningMap.contains(message.getStr(ASYNC)))
-              runningMap.put(asyncId, null);
-           } catch (KeeperException.NodeExistsException e) {
-             // Just catch and do nothing. The runningMap.contains(..) check ensures that this is the only
-             // entry point into the runningMap.
-             // NOTE: Make sure to handle it as soon as OCP gets distributed/multi-threaded.
-           }
-
-           log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
-           final String operation = message.getStr(QUEUE_OPERATION);
-           final TimerContext timerContext = stats.time("collection_" + operation); // even if operation is async, it is sync!
-           SolrResponse response = null;
-           try  {
-             response = processMessage(message, operation);
-           } finally {
-             timerContext.stop();
-           }
-
-           head.setBytes(SolrResponse.serializable(response));
-           if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
-             if(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
-               failureMap.put(asyncId, null);
-             } else {
-               completedMap.put(asyncId, null);
-             }
-           }
-           if(asyncId != null)
-            runningMap.remove(asyncId);
-
-           workQueue.remove(head);
-
-           if (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)  {
-             stats.error("collection_" + operation);
-             stats.storeFailureDetails("collection_" + operation, message, response);
-           } else {
-             stats.success("collection_" + operation);
-           }
 
-          log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
+    // TODO: Make maxThreads configurable.
+
+    this.tpe = new ThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
+        new SynchronousQueue<Runnable>(),
+        new DefaultSolrThreadFactory("OverseerThreadFactory"));
+    try {
+      while (!this.isClosed) {
+        try {
+          isLeader = amILeader();
+          if (LeaderStatus.NO == isLeader) {
+            break;
+          } else if (LeaderStatus.YES != isLeader) {
+            log.debug("am_i_leader unclear {}", isLeader);
+            continue; // not a no, not a yes, try asking again
+          }
+
+          log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
+          cleanUpWorkQueue();
+
+          printTrackingMaps();
+
+          boolean waited = false;
+
+          while (runningTasks.size() > maxParallelThreads) {
+            synchronized (waitLock) {
+              waitLock.wait(100);//wait for 100 ms or till a task is complete
+            }
+            waited = true;
+          }
+
+          if (waited)
+            cleanUpWorkQueue();
+
+          List<QueueEvent> heads = workQueue.peekTopN(maxParallelThreads, runningZKTasks, 2000L);
+
+          if (heads == null)
+            continue;
+
+          log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
+
+          if (isClosed) break;
+
+          for (QueueEvent head : heads) {
+            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+            String collectionName = message.containsKey(COLLECTION_PROP) ?
+                message.getStr(COLLECTION_PROP) : message.getStr("name");
+            String asyncId = message.getStr(ASYNC);
+            if (hasLeftOverItems) {
+              if (head.getId().equals(oldestItemInWorkQueue))
+                hasLeftOverItems = false;
+              if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
+                log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
+                workQueue.remove(head);
+                continue;
+              }
+            }
+
+            if (!checkExclusivity(message, head.getId())) {
+              log.debug("Exclusivity check failed for [{}]", message.toString());
+              continue;
+            }
+
+            try {
+              markTaskAsRunning(head, collectionName, asyncId);
+              log.debug("Marked task [{}] as running", head.getId());
+            } catch (KeeperException.NodeExistsException e) {
+              // This should never happen
+              log.error("Tried to pick up task [{}] when it was already running!", head.getId());
+            } catch (InterruptedException e) {
+              log.error("Thread interrupted while trying to pick task for execution.", head.getId());
+              Thread.currentThread().interrupt();
+            }
+
+            log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
+            String operation = message.getStr(Overseer.QUEUE_OPERATION);
+            Runner runner = new Runner(message,
+                operation, head);
+            tpe.execute(runner);
+          }
+
         } catch (KeeperException e) {
           if (e.code() == KeeperException.Code.SESSIONEXPIRED
               || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-             log.warn("Overseer cannot talk to ZK");
-             return;
-           }
-           SolrException.log(log, "", e);
-           throw new ZooKeeperException(
-               SolrException.ErrorCode.SERVER_ERROR, "", e);
-         } catch (InterruptedException e) {
-           Thread.currentThread().interrupt();
-           return;
-         } catch (Exception e) {
-           SolrException.log(log, "", e);
-         }
-       }
+            log.warn("Overseer cannot talk to ZK");
+            return;
+          }
+          SolrException.log(log, "", e);
+          throw new ZooKeeperException(
+              SolrException.ErrorCode.SERVER_ERROR, "", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          SolrException.log(log, "", e);
+        }
+      }
+    } finally {
+      this.close();
+    }
   }
-  
+
+  private boolean checkExclusivity(ZkNodeProps message, String id) throws KeeperException, InterruptedException {
+    String collectionName = message.containsKey(COLLECTION_PROP) ?
+        message.getStr(COLLECTION_PROP) : message.getStr("name");
+
+    if(collectionName == null)
+      return true;
+
+    if(collectionWip.contains(collectionName))
+      return false;
+
+    if(runningZKTasks.contains(id))
+      return false;
+
+    return true;
+  }
+
+  private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
+    synchronized (completedTasks) {
+      for (String id : completedTasks.keySet()) {
+        workQueue.remove(completedTasks.get(id));
+        runningZKTasks.remove(id);
+      }
+      completedTasks.clear();
+    }
+  }
+
   public void close() {
     isClosed = true;
+    if(tpe != null) {
+      if (!tpe.isShutdown()) {
+        tpe.shutdown();
+        try {
+          tpe.awaitTermination(60, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          log.warn("Thread interrupted while waiting for OCP threadpool shutdown.");
+          Thread.currentThread().interrupt();
+        } finally {
+          if (!tpe.isShutdown())
+            tpe.shutdownNow();
+        }
+      }
+    }
   }
 
   private void prioritizeOverseerNodes() throws KeeperException, InterruptedException {
@@ -302,9 +418,8 @@ public class OverseerCollectionProcessor
     if(nodeNames.size()<2) return;
     boolean designateIsInFront = overseerDesignates.contains( nodeNames.get(0));
 
-//
     ArrayList<String> nodesTobePushedBack =  new ArrayList<>();
-    //ensure that the node right behind the leader , i.r at position 1 is a Overseer
+    //ensure that the node right behind the leader , i.e at position 1 is a Overseer
     List<String> availableDesignates = new ArrayList<>();
 
     log.info("sorted nodes {}", nodeNames);//TODO to be removed
@@ -409,6 +524,7 @@ public class OverseerCollectionProcessor
 
   private void invokeOverseerOp(String nodeName, String op) {
     ModifiableSolrParams params = new ModifiableSolrParams();
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
     params.set("op", op);
     params.set("qt", adminPath);
@@ -443,8 +559,8 @@ public class OverseerCollectionProcessor
     log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
     return LeaderStatus.NO;
   }
-  
-  
+
+
   protected SolrResponse processMessage(ZkNodeProps message, String operation) {
     log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
 
@@ -481,12 +597,11 @@ public class OverseerCollectionProcessor
       } else if(LIST.isEqual(operation)) {
         listCollections(zkStateReader.getClusterState(), results);
       } else if (CLUSTERSTATUS.isEqual(operation)) {
-         getClusterStatus(zkStateReader.getClusterState(), message, results);
+        getClusterStatus(zkStateReader.getClusterState(), message, results);
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
       }
-
     } catch (Exception e) {
       SolrException.log(log, "Collection " + operation + " of " + operation
           + " failed", e);
@@ -495,8 +610,7 @@ public class OverseerCollectionProcessor
       nl.add("msg", e.getMessage());
       nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
       results.add("exception", nl);
-    } 
-    
+    }
     return new OverseerSolrResponse(results);
   }
 
@@ -518,16 +632,16 @@ public class OverseerCollectionProcessor
     NamedList stateUpdateQueueStats = new NamedList();
     NamedList workQueueStats = new NamedList();
     NamedList collectionQueueStats = new NamedList();
-    for (Map.Entry<String, Overseer.Stat> entry : this.stats.getStats().entrySet()) {
+    for (Map.Entry<String, Overseer.Stat> entry : stats.getStats().entrySet()) {
       String key = entry.getKey();
       NamedList<Object> lst = new SimpleOrderedMap<>();
       if (key.startsWith("collection_"))  {
         collectionStats.add(key.substring(11), lst);
-        int successes = this.stats.getSuccessCount(entry.getKey());
-        int errors = this.stats.getErrorCount(entry.getKey());
+        int successes = stats.getSuccessCount(entry.getKey());
+        int errors = stats.getErrorCount(entry.getKey());
         lst.add("requests", successes);
         lst.add("errors", errors);
-        List<Overseer.FailedOp> failureDetails = this.stats.getFailureDetails(key);
+        List<Overseer.FailedOp> failureDetails = stats.getFailureDetails(key);
         if (failureDetails != null) {
           List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
           for (Overseer.FailedOp failedOp : failureDetails) {
@@ -547,8 +661,8 @@ public class OverseerCollectionProcessor
       } else  {
         // overseer stats
         overseerStats.add(key, lst);
-        int successes = this.stats.getSuccessCount(entry.getKey());
-        int errors = this.stats.getErrorCount(entry.getKey());
+        int successes = stats.getSuccessCount(entry.getKey());
+        int errors = stats.getErrorCount(entry.getKey());
         lst.add("requests", successes);
         lst.add("errors", errors);
       }
@@ -751,6 +865,7 @@ public class OverseerCollectionProcessor
     String replicaName = message.getStr(REPLICA_PROP);
     DocCollection coll = clusterState.getCollection(collectionName);
     Slice slice = coll.getSlice(shard);
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     if(slice==null){
       throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid shard name : "+shard+" in collection : "+ collectionName);
     }
@@ -780,7 +895,8 @@ public class OverseerCollectionProcessor
       log.warn("Exception trying to unload core " + sreq, e);
     }
     
-    collectShardResponses(!Slice.ACTIVE.equals(replica.getStr(Slice.STATE)) ? new NamedList() : results, false, null);
+    collectShardResponses(!Slice.ACTIVE.equals(replica.getStr(Slice.STATE)) ? new NamedList() : results,
+        false, null, shardHandler);
     
     if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;//check if the core unload removed the corenode zk enry
     deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
@@ -973,7 +1089,8 @@ public class OverseerCollectionProcessor
     
   }
 
-  private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+  private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws KeeperException, InterruptedException {
     log.info("Create shard invoked: {}", message);
     String collectionName = message.getStr(COLLECTION_PROP);
     String shard = message.getStr(SHARD_ID_PROP);
@@ -981,6 +1098,7 @@ public class OverseerCollectionProcessor
       throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters" );
     int numSlices = 1;
 
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     DocCollection collection = clusterState.getCollection(collectionName);
     int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
     int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
@@ -1033,7 +1151,7 @@ public class OverseerCollectionProcessor
 
     }
 
-    processResponses(results);
+    processResponses(results, shardHandler);
 
     log.info("Finished create command on all shards for collection: "
         + collectionName);
@@ -1047,6 +1165,7 @@ public class OverseerCollectionProcessor
     String collectionName = message.getStr("collection");
     String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
     String splitKey = message.getStr("split.key");
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
 
     DocCollection collection = clusterState.getCollection(collectionName);
     DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
@@ -1192,7 +1311,7 @@ public class OverseerCollectionProcessor
       // do not abort splitshard if the unloading fails
       // this can happen because the replicas created previously may be down
       // the only side effect of this is that the sub shard may end up having more replicas than we want
-      collectShardResponses(results, false, null);
+      collectShardResponses(results, false, null, shardHandler);
 
       String asyncId = message.getStr(ASYNC);
       HashMap<String, String> requestMap = new HashMap<String, String>();
@@ -1225,11 +1344,11 @@ public class OverseerCollectionProcessor
         params.set(CoreAdminParams.SHARD, subSlice);
         setupAsyncRequest(asyncId, requestMap, params, nodeName);
         addPropertyParams(message, params);
-        sendShardRequest(nodeName, params);
+        sendShardRequest(nodeName, params, shardHandler);
       }
 
       collectShardResponses(results, true,
-          "SPLITSHARD failed to create subshard leaders");
+          "SPLITSHARD failed to create subshard leaders", shardHandler);
 
       completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1248,11 +1367,11 @@ public class OverseerCollectionProcessor
         ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
         setupAsyncRequest(asyncId, requestMap, p, nodeName);
 
-        sendShardRequest(nodeName, p);
+        sendShardRequest(nodeName, p, shardHandler);
       }
 
       collectShardResponses(results, true,
-          "SPLITSHARD timed out waiting for subshard leaders to come up");
+          "SPLITSHARD timed out waiting for subshard leaders to come up", shardHandler);
 
       completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1273,9 +1392,10 @@ public class OverseerCollectionProcessor
       params.set(CoreAdminParams.RANGES, rangesStr);
       setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
 
-      sendShardRequest(parentShardLeader.getNodeName(), params);
+      sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler);
 
-      collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
+      collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command",
+          shardHandler);
       completeAsyncRequest(asyncId, requestMap, results);
 
       log.info("Index on shard: " + nodeName + " split into two successfully");
@@ -1292,11 +1412,12 @@ public class OverseerCollectionProcessor
 
         setupAsyncRequest(asyncId, requestMap, params, nodeName);
 
-        sendShardRequest(nodeName, params);
+        sendShardRequest(nodeName, params, shardHandler);
       }
 
       collectShardResponses(results, true,
-          "SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
+          "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
+          shardHandler);
 
       completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1362,7 +1483,7 @@ public class OverseerCollectionProcessor
           //Not using this property. Do we really need to use it?
           //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
 
-          sendShardRequest(subShardNodeName, params);
+          sendShardRequest(subShardNodeName, params, shardHandler);
 
           String coreNodeName = waitForCoreNodeName(collection, subShardNodeName, shardName);
           // wait for the replicas to be seen as active on sub shard leader
@@ -1378,13 +1499,14 @@ public class OverseerCollectionProcessor
 
           setupAsyncRequest(asyncId, requestMap, p, nodeName);
 
-          sendShardRequest(nodeName, p);
+          sendShardRequest(nodeName, p, shardHandler);
 
         }
       }
 
       collectShardResponses(results, true,
-          "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up");
+          "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up",
+          shardHandler);
 
       completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1442,6 +1564,7 @@ public class OverseerCollectionProcessor
     }
   }
 
+
   static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
     HttpSolrServer server = null;
     try {
@@ -1488,7 +1611,9 @@ public class OverseerCollectionProcessor
     throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
   }
 
-  private void collectShardResponses(NamedList results, boolean abortOnError, String msgOnError) {
+  private void collectShardResponses(NamedList results, boolean abortOnError,
+                                     String msgOnError,
+                                     ShardHandler shardHandler) {
     ShardResponse srsp;
     do {
       srsp = shardHandler.takeCompletedOrError();
@@ -1529,14 +1654,15 @@ public class OverseerCollectionProcessor
           "The slice: " + slice.getName() + " is currently "
           + slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");
     }
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
 
     try {
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
       params.set(CoreAdminParams.DELETE_INDEX, "true");
-      sliceCmd(clusterState, params, null, slice);
+      sliceCmd(clusterState, params, null, slice, shardHandler);
 
-      processResponses(results);
+      processResponses(results, shardHandler);
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
           Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
@@ -1622,7 +1748,7 @@ public class OverseerCollectionProcessor
     if (clusterState.hasCollection(tempSourceCollectionName)) {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       Map<String, Object> props = ZkNodeProps.makeMap(
-          QUEUE_OPERATION, DELETECOLLECTION,
+          Overseer.QUEUE_OPERATION, DELETECOLLECTION,
           "name", tempSourceCollectionName);
 
       try {
@@ -1635,6 +1761,8 @@ public class OverseerCollectionProcessor
     CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
     DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
 
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
     log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
     // intersect source range, keyHashRange and target range
     // this is the range that has to be split from source and transferred to target
@@ -1657,9 +1785,10 @@ public class OverseerCollectionProcessor
     String nodeName = targetLeader.getNodeName();
     setupAsyncRequest(asyncId, requestMap, params, nodeName);
 
-    sendShardRequest(targetLeader.getNodeName(), params);
+    sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
 
-    collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
+    collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates",
+        shardHandler);
 
     completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1703,7 +1832,7 @@ public class OverseerCollectionProcessor
     // create a temporary collection with just one node on the shard leader
     String configName = zkStateReader.readConfigName(sourceCollection.getName());
     Map<String, Object> props = ZkNodeProps.makeMap(
-        QUEUE_OPERATION, CREATECOLLECTION,
+        Overseer.QUEUE_OPERATION, CREATECOLLECTION,
         "name", tempSourceCollectionName,
         REPLICATION_FACTOR, 1,
         NUM_SLICES, 1,
@@ -1733,10 +1862,11 @@ public class OverseerCollectionProcessor
     cmd.setState(ZkStateReader.ACTIVE);
     cmd.setCheckLive(true);
     cmd.setOnlyIfLeader(true);
-    sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
+    sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler);
 
     collectShardResponses(results, true,
-        "MIGRATE failed to create temp collection leader or timed out waiting for it to come up");
+        "MIGRATE failed to create temp collection leader or timed out waiting for it to come up",
+        shardHandler);
 
     log.info("Asking source leader to split index");
     params = new ModifiableSolrParams();
@@ -1750,8 +1880,8 @@ public class OverseerCollectionProcessor
 
     setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
 
-    sendShardRequest(tempNodeName, params);
-    collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
+    sendShardRequest(tempNodeName, params, shardHandler);
+    collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command", shardHandler);
     completeAsyncRequest(asyncId, requestMap, results);
 
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
@@ -1764,9 +1894,10 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
 
     setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
-    sendShardRequest(targetLeader.getNodeName(), params);
+    sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
     collectShardResponses(results, true,
-        "MIGRATE failed to create replica of temporary collection in target leader node.");
+        "MIGRATE failed to create replica of temporary collection in target leader node.",
+        shardHandler);
 
     completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1785,10 +1916,11 @@ public class OverseerCollectionProcessor
 
     setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
 
-    sendShardRequest(tempSourceLeader.getNodeName(), params);
+    sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler);
 
     collectShardResponses(results, true,
-        "MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
+        "MIGRATE failed to create temp collection replica or timed out waiting for them to come up",
+        shardHandler);
 
     completeAsyncRequest(asyncId, requestMap, results);
     log.info("Successfully created replica of temp source collection on target leader node");
@@ -1801,9 +1933,11 @@ public class OverseerCollectionProcessor
 
     setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
 
-    sendShardRequest(targetLeader.getNodeName(), params);
+    sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
     collectShardResponses(results, true,
-        "MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
+        "MIGRATE failed to merge " + tempCollectionReplica2 +
+            " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(),
+        shardHandler);
 
     completeAsyncRequest(asyncId, requestMap, results);
 
@@ -1813,16 +1947,17 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
     setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
 
-    sendShardRequest(targetLeader.getNodeName(), params);
+    sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
     collectShardResponses(results, true,
-        "MIGRATE failed to request node to apply buffered updates");
+        "MIGRATE failed to request node to apply buffered updates",
+        shardHandler);
 
     completeAsyncRequest(asyncId, requestMap, results);
 
     try {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       props = ZkNodeProps.makeMap(
-          QUEUE_OPERATION, DELETECOLLECTION,
+          Overseer.QUEUE_OPERATION, DELETECOLLECTION,
           "name", tempSourceCollectionName);
       deleteCollection(new ZkNodeProps(props), results);
     } catch (Exception e) {
@@ -1860,7 +1995,7 @@ public class OverseerCollectionProcessor
     }
   }
 
-  private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
+  private void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler) {
     ShardRequest sreq = new ShardRequest();
     params.set("qt", adminPath);
     sreq.purpose = 1;
@@ -1880,6 +2015,7 @@ public class OverseerCollectionProcessor
       }
     }
   }
+
   private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
     String collectionName = message.getStr("name");
     if (clusterState.hasCollection(collectionName)) {
@@ -1892,9 +2028,9 @@ public class OverseerCollectionProcessor
       
       int repFactor = message.getInt( REPLICATION_FACTOR, 1);
 
+      ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
       String async = null;
-      if (message.containsKey("async"))
-        async = message.getStr("async");
+      async = message.getStr("async");
 
       Integer numSlices = message.getInt(NUM_SLICES, null);
       String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
@@ -1984,7 +2120,8 @@ public class OverseerCollectionProcessor
       // For tracking async calls.
       HashMap<String, String> requestMap = new HashMap<String, String>();
 
-      log.info("Creating SolrCores for new collection, shardNames {} , replicationFactor : {}", shardNames, repFactor);
+      log.info("Creating SolrCores for new collection {}, shardNames {} , replicationFactor : {}",
+          collectionName, shardNames, repFactor);
       Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
       for (int i = 1; i <= shardNames.size(); i++) {
         String sliceName = shardNames.get(i-1);
@@ -2049,7 +2186,7 @@ public class OverseerCollectionProcessor
         }
       }
 
-      processResponses(results);
+      processResponses(results, shardHandler);
 
       completeAsyncRequest(async, requestMap, results);
 
@@ -2107,6 +2244,7 @@ public class OverseerCollectionProcessor
       throw new SolrException(ErrorCode.BAD_REQUEST,
           "Collection: " + collection + " shard: " + shard + " does not exist");
     }
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
 
     if(node== null){
       node = getNodesForNewShard(clusterState,collection, coll.getSlices().size() , coll.getInt(MAX_SHARDS_PER_NODE, 1),coll.getInt(REPLICATION_FACTOR, 1),null).get(0).nodeName;
@@ -2179,14 +2317,13 @@ public class OverseerCollectionProcessor
     }
     addPropertyParams(message, params);
 
-    sendShardRequest(node, params);
+    sendShardRequest(node, params, shardHandler);
 
     collectShardResponses(results, true,
-      "ADDREPLICA failed to create replica");
+        "ADDREPLICA failed to create replica", shardHandler);
   }
 
-
-  private void processResponses(NamedList results) {
+  private void processResponses(NamedList results, ShardHandler shardHandler) {
     ShardResponse srsp;
     do {
       srsp = shardHandler.takeCompletedOrError();
@@ -2196,7 +2333,6 @@ public class OverseerCollectionProcessor
     } while (srsp != null);
   }
 
-
   private String createConfNode(String coll, ZkNodeProps message, boolean isLegacyCloud) throws KeeperException, InterruptedException {
     String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
     if(configName == null){
@@ -2234,19 +2370,21 @@ public class OverseerCollectionProcessor
   private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) {
     log.info("Executing Collection Cmd : " + params);
     String collectionName = message.getStr("name");
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     
     DocCollection coll = clusterState.getCollection(collectionName);
     
     for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
       Slice slice = entry.getValue();
-      sliceCmd(clusterState, params, stateMatcher, slice);
+      sliceCmd(clusterState, params, stateMatcher, slice, shardHandler);
     }
 
-    processResponses(results);
+    processResponses(results, shardHandler);
 
   }
 
-  private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher, Slice slice) {
+  private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher,
+                        Slice slice, ShardHandler shardHandler) {
     Map<String,Replica> shards = slice.getReplicasMap();
     Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
     for (Map.Entry<String,Replica> shardEntry : shardEntries) {
@@ -2314,12 +2452,13 @@ public class OverseerCollectionProcessor
 
   private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
     for(String k:requestMap.keySet()) {
-      log.debug("I am Waiting for : " + k + "/" + requestMap.get(k));
+      log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
       results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
     }
   }
 
   private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
     params.set(CoreAdminParams.REQUESTID, requestId);
@@ -2348,7 +2487,7 @@ public class OverseerCollectionProcessor
             try {
               Thread.sleep(1000);
             } catch (InterruptedException e) {
-              e.printStackTrace();
+              Thread.currentThread().interrupt();
             }
             continue;
 
@@ -2377,6 +2516,162 @@ public class OverseerCollectionProcessor
       } while (srsp != null);
     } while(true);
   }
+
+  private void markTaskAsRunning(QueueEvent head, String collectionName,String asyncId)
+      throws KeeperException, InterruptedException {
+    synchronized (runningZKTasks) {
+      runningZKTasks.add(head.getId());
+    }
+
+    synchronized (runningTasks) {
+      runningTasks.add(head.getId());
+    }
+    if(collectionName != null) {
+      synchronized (collectionWip) {
+        collectionWip.add(collectionName);
+      }
+    }
+
+    if(asyncId != null)
+      runningMap.put(asyncId, null);
+  }
+  
+  protected class Runner implements Runnable {
+    ZkNodeProps message;
+    String operation;
+    SolrResponse response;
+    QueueEvent head;
+  
+    public Runner(ZkNodeProps message, String operation, QueueEvent head) {
+      this.message = message;
+      this.operation = operation;
+      this.head = head;
+      response = null;
+    }
+
+
+    @Override
+    public void run() {
+
+      final TimerContext timerContext = stats.time("collection_" + operation);
+
+      boolean success = false;
+      String asyncId = message.getStr(ASYNC);
+      String collectionName = message.containsKey(COLLECTION_PROP) ?
+          message.getStr(COLLECTION_PROP) : message.getStr("name");
+      try {
+        try {
+          log.debug("Runner processing {}", head.getId());
+          response = processMessage(message, operation);
+        } finally {
+          timerContext.stop();
+          updateStats();
+        }
+
+        if(asyncId != null) {
+          if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
+            failureMap.put(asyncId, null);
+            log.debug("Updated failed map for task with zkid:[{}]", head.getId());
+          } else {
+            completedMap.put(asyncId, null);
+            log.debug("Updated completed map for task with zkid:[{}]", head.getId());
+          }
+        } else {
+          head.setBytes(SolrResponse.serializable(response));
+          log.debug("Completed task:[{}]", head.getId());
+        }
+
+        markTaskComplete(head.getId(), asyncId, collectionName);
+        log.debug("Marked task [{}] as completed.", head.getId());
+        printTrackingMaps();
+
+        log.info("Overseer Collection Processor: Message id:" + head.getId() +
+            " complete, response:" + response.getResponse().toString());
+        success = true;
+      } catch (KeeperException e) {
+        SolrException.log(log, "", e);
+      } catch (InterruptedException e) {
+        // Reset task from tracking data structures so that it can be retried.
+        resetTaskWithException(head.getId(), asyncId, collectionName);
+        log.warn("Resetting task {} as the thread was interrupted.", head.getId());
+        Thread.currentThread().interrupt();
+      } finally {
+        if(!success) {
+          // Reset task from tracking data structures so that it can be retried.
+          resetTaskWithException(head.getId(), asyncId, collectionName);
+        }
+        synchronized (waitLock){
+          waitLock.notifyAll();
+        }
+      }
+    }
+
+    private void markTaskComplete(String id, String asyncId, String collectionName)
+        throws KeeperException, InterruptedException {
+      synchronized (completedTasks) {
+        completedTasks.put(id, head);
+      }
+
+      synchronized (runningTasks) {
+        runningTasks.remove(id);
+      }
+
+      if(asyncId != null)
+        runningMap.remove(asyncId);
+
+      if(collectionName != null) {
+        synchronized (collectionWip) {
+          collectionWip.remove(collectionName);
+        }
+      }
+    }
+
+    private void resetTaskWithException(String id, String asyncId, String collectionName) {
+      log.warn("Resetting task: {}, requestid: {}, collectionName: {}", id, asyncId, collectionName);
+      try {
+        if (asyncId != null)
+          runningMap.remove(asyncId);
+
+        synchronized (runningTasks) {
+          runningTasks.remove(id);
+        }
+
+        if (collectionName != null) {
+          synchronized (collectionWip) {
+            collectionWip.remove(collectionName);
+          }
+        }
+      } catch (KeeperException e) {
+        SolrException.log(log, "", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+
+    }
+
+    private void updateStats() {
+      if (isSuccessful()) {
+        stats.success("collection_" + operation);
+      } else {
+        stats.error("collection_" + operation);
+        stats.storeFailureDetails("collection_" + operation, message, response);
+      }
+    }
+
+    private boolean isSuccessful() {
+      if(response == null)
+        return false;
+      return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
+    }
+  }
+
+  private void printTrackingMaps() {
+    log.debug("RunningTasks: {}", runningTasks.toString());
+    log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
+    log.debug("RunningZKTasks: {}", runningZKTasks.toString());
+  }
+
+
   String getId(){
     return myId;
   }

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu May 22 11:38:47 2014
@@ -1737,7 +1737,15 @@ public final class ZkController {
   public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection, 
       final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState) 
           throws KeeperException, InterruptedException 
-  {
+  {    
+    if (collection == null)
+      throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
+
+    if (shardId == null)
+      throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
+    
+    if (replicaUrl == null)
+      throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery");
     
     // First, determine if this replica is already in recovery handling
     // which is needed because there can be many concurrent errors flooding in

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java Thu May 22 11:38:47 2014
@@ -245,6 +245,7 @@ public abstract class DirectoryFactory i
 
   public String getDataHome(CoreDescriptor cd) throws IOException {
     // by default, we go off the instance directory
-    return normalize(SolrResourceLoader.normalizeDir(cd.getInstanceDir()) + cd.getDataDir());
+    String instanceDir = new File(cd.getInstanceDir()).getAbsolutePath();
+    return normalize(SolrResourceLoader.normalizeDir(instanceDir) + cd.getDataDir());
   }
 }

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/core/SolrCore.java Thu May 22 11:38:47 2014
@@ -161,6 +161,7 @@ public final class SolrCore implements S
   private final SolrResourceLoader resourceLoader;
   private volatile IndexSchema schema;
   private final String dataDir;
+  private final String ulogDir;
   private final UpdateHandler updateHandler;
   private final SolrCoreState solrCoreState;
   
@@ -242,6 +243,10 @@ public final class SolrCore implements S
     return dataDir;
   }
 
+  public String getUlogDir() {
+    return ulogDir;
+  }
+
   public String getIndexDir() {
     synchronized (searcherLock) {
       if (_searcher == null) return getNewIndexDir();
@@ -654,6 +659,7 @@ public final class SolrCore implements S
     this.setName(name);
     this.schema = null;
     this.dataDir = null;
+    this.ulogDir = null;
     this.solrConfig = null;
     this.startTime = System.currentTimeMillis();
     this.maxWarmingSearchers = 2;  // we don't have a config yet, just pick a number.
@@ -687,7 +693,7 @@ public final class SolrCore implements S
     if (updateHandler == null) {
       initDirectoryFactory();
     }
-    
+
     if (dataDir == null) {
       if (cd.usingDefaultDataDir()) dataDir = config.getDataDir();
       if (dataDir == null) {
@@ -701,8 +707,18 @@ public final class SolrCore implements S
         }
       }
     }
-
     dataDir = SolrResourceLoader.normalizeDir(dataDir);
+
+    String updateLogDir = cd.getUlogDir();
+    if (updateLogDir == null) {
+      updateLogDir = dataDir;
+      if (new File(updateLogDir).isAbsolute() == false) {
+        updateLogDir = SolrResourceLoader.normalizeDir(cd.getInstanceDir()) + updateLogDir;
+      }
+    }
+    ulogDir = updateLogDir;
+
+
     log.info(logid+"Opening new SolrCore at " + resourceLoader.getInstanceDir() + ", dataDir="+dataDir);
 
     if (null != cd && null != cd.getCloudDescriptor()) {

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu May 22 11:38:47 2014
@@ -283,16 +283,16 @@ public class CollectionsHandler extends 
         success.add("state", "completed");
         success.add("msg", "found " + requestId + " in completed tasks");
         results.add("status", success);
-      } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
-        SimpleOrderedMap success = new SimpleOrderedMap();
-        success.add("state", "running");
-        success.add("msg", "found " + requestId + " in submitted tasks");
-        results.add("status", success);
       } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
         SimpleOrderedMap success = new SimpleOrderedMap();
         success.add("state", "failed");
         success.add("msg", "found " + requestId + " in failed tasks");
         results.add("status", success);
+      } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
+        SimpleOrderedMap success = new SimpleOrderedMap();
+        success.add("state", "running");
+        success.add("msg", "found " + requestId + " in submitted tasks");
+        results.add("status", success);
       } else {
         SimpleOrderedMap failure = new SimpleOrderedMap();
         failure.add("state", "notfound");

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu May 22 11:38:47 2014
@@ -1032,8 +1032,9 @@ public class CoreAdminHandler extends Re
               
               boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE));
               log.info("In WaitForState("+waitForState+"): collection="+collection+", shard="+slice.getName()+
+                  ", thisCore="+core.getName()+", leaderDoesNotNeedRecovery="+leaderDoesNotNeedRecovery+
                   ", isLeader? "+core.getCoreDescriptor().getCloudDescriptor().isLeader()+
-                  ", live="+live+", currentState="+state+", localState="+localState+", nodeName="+nodeName+
+                  ", live="+live+", checkLive="+checkLive+", currentState="+state+", localState="+localState+", nodeName="+nodeName+
                   ", coreNodeName="+coreNodeName+", onlyIfActiveCheckResult="+onlyIfActiveCheckResult+", nodeProps: "+nodeProps);
 
               if (!onlyIfActiveCheckResult && nodeProps != null && (state.equals(waitForState) || leaderDoesNotNeedRecovery)) {
@@ -1345,11 +1346,13 @@ public class CoreAdminHandler extends Re
    * Helper method to add a task to a tracking map.
    */
   protected void addTask(String map, TaskObject o, boolean limit) {
-    if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
-      String key = getMap(map).entrySet().iterator().next().getKey();
-      getMap(map).remove(key);
+    synchronized (getMap(map)) {
+      if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
+        String key = getMap(map).entrySet().iterator().next().getKey();
+        getMap(map).remove(key);
+      }
+      addTask(map, o);
     }
-    addTask(map, o);
   }
 
 

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Thu May 22 11:38:47 2014
@@ -405,6 +405,10 @@ public class HttpShardHandler extends Sh
     ClientUtils.addSlices(target, collectionName, slices, multiCollection);
   }
 
+  public ShardHandlerFactory getShardHandlerFactory(){
+    return httpShardHandlerFactory;
+  }
+
 
 
 }

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java Thu May 22 11:38:47 2014
@@ -25,4 +25,5 @@ public abstract class ShardHandler {
   public abstract ShardResponse takeCompletedIncludingErrors();
   public abstract ShardResponse takeCompletedOrError();
   public abstract void cancelAll();
+  public abstract ShardHandlerFactory getShardHandlerFactory();
 }

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/search/Insanity.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/search/Insanity.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/search/Insanity.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/search/Insanity.java Thu May 22 11:38:47 2014
@@ -65,7 +65,7 @@ public class Insanity {
       for (FieldInfo fi : in.getFieldInfos()) {
         if (fi.name.equals(insaneField)) {
           filteredInfos.add(new FieldInfo(fi.name, fi.isIndexed(), fi.number, fi.hasVectors(), fi.omitsNorms(),
-                                          fi.hasPayloads(), fi.getIndexOptions(), null, fi.getNormType(), null));
+                                          fi.hasPayloads(), fi.getIndexOptions(), null, fi.getNormType(), -1, null));
         } else {
           filteredInfos.add(fi);
         }

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/SolrSuggester.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/SolrSuggester.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/SolrSuggester.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/SolrSuggester.java Thu May 22 11:38:47 2014
@@ -17,6 +17,7 @@ package org.apache.solr.spelling.suggest
  * limitations under the License.
  */
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -28,6 +29,7 @@ import org.apache.lucene.search.suggest.
 import org.apache.lucene.search.suggest.Lookup;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.slf4j.Logger;
@@ -101,7 +103,22 @@ public class SolrSuggester {
     // initialize appropriate lookup instance
     factory = core.getResourceLoader().newInstance(lookupImpl, LookupFactory.class);
     lookup = factory.create(config, core);
-    
+    core.addCloseHook(new CloseHook() {
+      @Override
+      public void preClose(SolrCore core) {
+        if (lookup != null && lookup instanceof Closeable) {
+          try {
+            ((Closeable) lookup).close();
+          } catch (IOException e) {
+            LOG.warn("Could not close the suggester lookup.", e);
+          }
+        }
+      }
+      
+      @Override
+      public void postClose(SolrCore core) {}
+    });
+
     // if store directory is provided make it or load up the lookup with its content
     if (store != null) {
       storeDir = new File(store);

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/Suggester.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/Suggester.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/Suggester.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/Suggester.java Thu May 22 11:38:47 2014
@@ -17,6 +17,7 @@
 
 package org.apache.solr.spelling.suggest;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -33,13 +34,14 @@ import org.apache.lucene.search.spell.Di
 import org.apache.lucene.search.spell.HighFrequencyDictionary;
 import org.apache.lucene.search.spell.SuggestMode;
 import org.apache.lucene.search.suggest.FileDictionary;
-import org.apache.lucene.search.suggest.Lookup.LookupResult;
 import org.apache.lucene.search.suggest.Lookup;
+import org.apache.lucene.search.suggest.Lookup.LookupResult;
 import org.apache.lucene.search.suggest.analyzing.AnalyzingSuggester;
 import org.apache.lucene.search.suggest.fst.WFSTCompletionLookup;
 import org.apache.lucene.util.CharsRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.spelling.SolrSpellChecker;
@@ -103,6 +105,22 @@ public class Suggester extends SolrSpell
     factory = core.getResourceLoader().newInstance(lookupImpl, LookupFactory.class);
     
     lookup = factory.create(config, core);
+    core.addCloseHook(new CloseHook() {
+      @Override
+      public void preClose(SolrCore core) {
+        if (lookup != null && lookup instanceof Closeable) {
+          try {
+            ((Closeable) lookup).close();
+          } catch (IOException e) {
+            LOG.warn("Could not close the suggester lookup.", e);
+          }
+        }
+      }
+      
+      @Override
+      public void postClose(SolrCore core) {}
+    });
+    
     String store = (String)config.get(STORE_DIR);
     if (store != null) {
       storeDir = new File(store);
@@ -120,6 +138,7 @@ public class Suggester extends SolrSpell
         }
       }
     }
+    
     return name;
   }
   

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/AnalyzingInfixLookupFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/AnalyzingInfixLookupFactory.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/AnalyzingInfixLookupFactory.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/AnalyzingInfixLookupFactory.java Thu May 22 11:38:47 2014
@@ -83,6 +83,9 @@ public class AnalyzingInfixLookupFactory
     String indexPath = params.get(INDEX_PATH) != null
     ? params.get(INDEX_PATH).toString()
     : DEFAULT_INDEX_PATH;
+    if (new File(indexPath).isAbsolute() == false) {
+      indexPath = core.getDataDir() + File.separator + indexPath;
+    }
     
     int minPrefixChars = params.get(MIN_PREFIX_CHARS) != null
     ? Integer.parseInt(params.get(MIN_PREFIX_CHARS).toString())

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/BlendedInfixLookupFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/BlendedInfixLookupFactory.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/BlendedInfixLookupFactory.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/spelling/suggest/fst/BlendedInfixLookupFactory.java Thu May 22 11:38:47 2014
@@ -82,6 +82,9 @@ public class BlendedInfixLookupFactory e
     String indexPath = params.get(INDEX_PATH) != null
     ? params.get(INDEX_PATH).toString()
     : DEFAULT_INDEX_PATH;
+    if (new File(indexPath).isAbsolute() == false) {
+      indexPath = core.getDataDir() + File.separator + indexPath;
+    }
     
     int minPrefixChars = params.get(MIN_PREFIX_CHARS) != null
     ? Integer.parseInt(params.get(MIN_PREFIX_CHARS).toString())

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java Thu May 22 11:38:47 2014
@@ -30,12 +30,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.lucene.store.BaseDirectory;
-import org.apache.lucene.store.BufferedIndexOutput;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.IOUtils;
 import org.apache.solr.store.blockcache.CustomBufferedIndexInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,14 +97,11 @@ public class HdfsDirectory extends BaseD
   }
   
   @Override
-  public IndexOutput createOutput(String name, IOContext context)
-      throws IOException {
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
     if (SEGMENTS_GEN.equals(name)) {
       return new NullIndexOutput();
     }
-    HdfsFileWriter writer = new HdfsFileWriter(getFileSystem(), new Path(
-        hdfsDirPath, name));
-    return new HdfsIndexOutput(writer);
+    return new HdfsFileWriter(getFileSystem(), new Path(hdfsDirPath, name));
   }
   
   private String[] getNormalNames(List<String> files) {
@@ -233,36 +228,6 @@ public class HdfsDirectory extends BaseD
     }
   }
   
-  static class HdfsIndexOutput extends BufferedIndexOutput {
-    
-    private HdfsFileWriter writer;
-    
-    public HdfsIndexOutput(HdfsFileWriter writer) {
-      this.writer = writer;
-    }
-    
-    @Override
-    public void close() throws IOException {
-      boolean success = false;
-      try {
-        super.close();
-        success = true;
-      } finally {
-        if (success) {
-          IOUtils.close(writer);
-        } else {
-          IOUtils.closeWhileHandlingException(writer);
-        }
-      }
-    }
-
-    @Override
-    protected void flushBuffer(byte[] b, int offset, int len)
-        throws IOException {
-      writer.writeBytes(b, offset, len);
-    }
-  }
-  
   @Override
   public void sync(Collection<String> names) throws IOException {
     LOG.debug("Sync called on {}", Arrays.toString(names.toArray()));

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java Thu May 22 11:38:47 2014
@@ -17,37 +17,31 @@ package org.apache.solr.store.hdfs;
  * limitations under the License.
  */
 
-import java.io.Closeable;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.lucene.store.DataOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.lucene.store.OutputStreamIndexOutput;
 
 /**
  * @lucene.experimental
  */
-public class HdfsFileWriter extends DataOutput implements Closeable {
-  public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class);
+public class HdfsFileWriter extends OutputStreamIndexOutput {
   
   public static final String HDFS_SYNC_BLOCK = "solr.hdfs.sync.block";
-  
-  private final Path path;
-  private FSDataOutputStream outputStream;
-  private long currentPosition;
+  public static final int BUFFER_SIZE = 16384;
   
   public HdfsFileWriter(FileSystem fileSystem, Path path) throws IOException {
-    LOG.debug("Creating writer on {}", path);
-    this.path = path;
-    
+    super(getOutputStream(fileSystem, path), BUFFER_SIZE);
+  }
+  
+  private static final OutputStream getOutputStream(FileSystem fileSystem, Path path) throws IOException {
     Configuration conf = fileSystem.getConf();
     FsServerDefaults fsDefaults = fileSystem.getServerDefaults(path);
     EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE,
@@ -55,45 +49,9 @@ public class HdfsFileWriter extends Data
     if (Boolean.getBoolean(HDFS_SYNC_BLOCK)) {
       flags.add(CreateFlag.SYNC_BLOCK);
     }
-    outputStream = fileSystem.create(path, FsPermission.getDefault()
+    return fileSystem.create(path, FsPermission.getDefault()
         .applyUMask(FsPermission.getUMask(conf)), flags, fsDefaults
         .getFileBufferSize(), fsDefaults.getReplication(), fsDefaults
         .getBlockSize(), null);
   }
-  
-  public long length() {
-    return currentPosition;
-  }
-  
-  public void seek(long pos) throws IOException {
-    LOG.error("Invalid seek called on {}", path);
-    throw new IOException("Seek not supported");
-  }
-  
-  public void flush() throws IOException {
-    // flush to the network, not guarantees it makes it to the DN (vs hflush)
-    outputStream.flush();
-    LOG.debug("Flushed file {}", path);
-  }
-  
-  public void close() throws IOException {
-    outputStream.close();
-    LOG.debug("Closed writer on {}", path);
-  }
-  
-  @Override
-  public void writeByte(byte b) throws IOException {
-    outputStream.write(b & 0xFF);
-    currentPosition++;
-  }
-  
-  @Override
-  public void writeBytes(byte[] b, int offset, int length) throws IOException {
-    outputStream.write(b, offset, length);
-    currentPosition += length;
-  }
-  
-  public long getPosition() {
-    return currentPosition;
-  }
 }

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Thu May 22 11:38:47 2014
@@ -18,11 +18,15 @@ package org.apache.solr.update;
  */
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.List;
+
+import org.apache.http.HttpResponse;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -33,6 +37,7 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.Diagnostics;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplicationTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +68,7 @@ public class SolrCmdDistributor {
     this.retryPause = retryPause;
   }
   
-  public void finish() {
+  public void finish() {    
     try {
       servers.blockUntilFinished();
       doRetriesIfNeeded();
@@ -168,16 +173,20 @@ public class SolrCmdDistributor {
   }
   
   public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
-    distribAdd(cmd, nodes, params, false);
+    distribAdd(cmd, nodes, params, false, null);
   }
-  
+
   public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
+    distribAdd(cmd, nodes, params, synchronous, null);
+  }
+  
+  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {  
 
     for (Node node : nodes) {
       UpdateRequest uReq = new UpdateRequest();
       uReq.setParams(params);
-      uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);      
-      submit(new Req(cmd.toString(), node, uReq, synchronous));
+      uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+      submit(new Req(cmd.toString(), node, uReq, synchronous, rrt));
     }
     
   }
@@ -233,7 +242,7 @@ public class SolrCmdDistributor {
     
     try {
       SolrServer solrServer = servers.getSolrServer(req);
-      NamedList<Object> rsp = solrServer.request(req.uReq);
+      solrServer.request(req.uReq);
     } catch (Exception e) {
       SolrException.log(log, e);
       Error error = new Error();
@@ -252,12 +261,18 @@ public class SolrCmdDistributor {
     public int retries;
     public boolean synchronous;
     public String cmdString;
-    
+    public RequestReplicationTracker rfTracker;
+
     public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
+      this(cmdString, node, uReq, synchronous, null);
+    }
+    
+    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
       this.node = node;
       this.uReq = uReq;
       this.synchronous = synchronous;
       this.cmdString = cmdString;
+      this.rfTracker = rfTracker;
     }
     
     public String toString() {
@@ -266,6 +281,37 @@ public class SolrCmdDistributor {
       sb.append("; node=").append(String.valueOf(node));
       return sb.toString();
     }
+    public void trackRequestResult(HttpResponse resp, boolean success) {      
+      if (rfTracker != null) {
+        Integer rf = null;
+        if (resp != null) {
+          // need to parse out the rf from requests that were forwards to another leader
+          InputStream inputStream = null;
+          try {
+            inputStream = resp.getEntity().getContent();
+            BinaryResponseParser brp = new BinaryResponseParser();
+            NamedList<Object> nl= brp.processResponse(inputStream, null);
+            Object hdr = nl.get("responseHeader");
+            if (hdr != null && hdr instanceof NamedList) {
+              NamedList<Object> hdrList = (NamedList<Object>)hdr;
+              Object rfObj = hdrList.get(UpdateRequest.REPFACT);
+              if (rfObj != null && rfObj instanceof Integer) {
+                rf = (Integer)rfObj;
+              }
+            }
+          } catch (Exception e) {
+            log.warn("Failed to parse response from "+node+" during replication factor accounting due to: "+e);
+          } finally {
+            if (inputStream != null) {
+              try {
+                inputStream.close();
+              } catch (Exception ignore){}
+            }
+          }
+        }
+        rfTracker.trackRequestResult(node, success, rf);
+      }
+    }
   }
     
 
@@ -296,6 +342,8 @@ public class SolrCmdDistributor {
     public abstract String getCoreName();
     public abstract String getBaseUrl();
     public abstract ZkCoreNodeProps getNodeProps();
+    public abstract String getCollection();
+    public abstract String getShardId();
   }
 
   public static class StdNode extends Node {

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java Thu May 22 11:38:47 2014
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
@@ -69,6 +70,7 @@ public class StreamingSolrServers {
       server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor, true) {
         @Override
         public void handleError(Throwable ex) {
+          req.trackRequestResult(null, false);
           log.error("error", ex);
           Error error = new Error();
           error.e = (Exception) ex;
@@ -78,6 +80,10 @@ public class StreamingSolrServers {
           error.req = req;
           errors.add(error);
         }
+        @Override
+        public void onSuccess(HttpResponse resp) {
+          req.trackRequestResult(resp, true);
+        }
       };
       server.setParser(new BinaryResponseParser());
       server.setRequestWriter(new BinaryRequestWriter());

Modified: lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1596817&r1=1596816&r2=1596817&view=diff
==============================================================================
--- lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/lucene5675/solr/core/src/java/org/apache/solr/update/UpdateLog.java Thu May 22 11:38:47 2014
@@ -222,15 +222,7 @@ public class UpdateLog implements Plugin
    * for an existing log whenever the core or update handler changes.
    */
   public void init(UpdateHandler uhandler, SolrCore core) {
-    // ulogDir from CoreDescriptor overrides
-    String ulogDir = core.getCoreDescriptor().getUlogDir();
-    if (ulogDir != null) {
-      dataDir = ulogDir;
-    }
-
-    if (dataDir == null || dataDir.length()==0) {
-      dataDir = core.getDataDir();
-    }
+    dataDir = core.getUlogDir();
 
     this.uhandler = uhandler;
 



Mime
View raw message