Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7BAA9979D for ; Sun, 9 Oct 2011 23:46:35 +0000 (UTC) Received: (qmail 50746 invoked by uid 500); 9 Oct 2011 23:46:35 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 50739 invoked by uid 99); 9 Oct 2011 23:46:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Oct 2011 23:46:35 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Oct 2011 23:46:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C53402388A3F; Sun, 9 Oct 2011 23:46:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1180745 [1/2] - in /lucene/dev/branches/solrcloud: solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/ solr/core/src/java/org/apache/solr/client/solrj/embedded/ solr/core/src/java/org/apache/solr/cloud/ solr/core/sr... Date: Sun, 09 Oct 2011 23:46:00 -0000 To: commits@lucene.apache.org From: markrmiller@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111009234601.C53402388A3F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: markrmiller Date: Sun Oct 9 23:45:59 2011 New Revision: 1180745 URL: http://svn.apache.org/viewvc?rev=1180745&view=rev Log: SOLR-2358: Distributing Indexing - early infrastructure and tests Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (with props) lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (with props) lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml (with props) lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (with props) lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java (with props) lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java (with props) lucene/dev/branches/solrcloud/src/ Modified: lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrJettyTestBase.java Modified: lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java (original) +++ lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java Sun Oct 9 23:45:59 2011 @@ -145,9 +145,8 @@ public class TestContentStreamDataSource } private JettySolrRunner createJetty(SolrInstance instance) throws Exception { - System.setProperty("solr.solr.home", instance.getHomeDir()); System.setProperty("solr.data.dir", instance.getDataDir()); - JettySolrRunner jetty = new JettySolrRunner("/solr", 0); + JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0); jetty.start(); return jetty; } Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Sun Oct 9 23:45:59 2011 @@ -51,19 +51,20 @@ public class JettySolrRunner { private boolean waitOnSolr = false; - public JettySolrRunner(String context, int port) { - this.init(context, port); + public JettySolrRunner(String solrHome, String context, int port) { + this.init(solrHome, context, port); } - public JettySolrRunner(String context, int port, String solrConfigFilename) { - this.init(context, port); + public JettySolrRunner(String solrHome, String context, int port, String solrConfigFilename) { + this.init(solrHome, context, port); this.solrConfigFilename = solrConfigFilename; } - private void init(String context, int port) { + private void init(String solrHome, String context, int port) { this.context = context; server = new Server(port); server.setStopAtShutdown(true); + System.setProperty("solr.solr.home", solrHome); if (System.getProperty("jetty.testMode") != null) { // SelectChannelConnector connector = new SelectChannelConnector(); // Normal SocketConnector is what solr's example server uses by default @@ -99,6 +100,8 @@ public class JettySolrRunner { Handler.REQUEST); if (solrConfigFilename != null) System.clearProperty("solrconfig"); + + System.clearProperty("solr.solr.home"); } public void lifeCycleFailure(LifeCycle arg0, Throwable arg1) { @@ -172,7 +175,7 @@ public class JettySolrRunner { */ public static void main(String[] args) { try { - JettySolrRunner jetty = new JettySolrRunner("/solr", 8983); + JettySolrRunner jetty = new JettySolrRunner(".", "/solr", 8983); jetty.start(); } catch (Exception ex) { ex.printStackTrace(); Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java Sun Oct 9 23:45:59 2011 @@ -17,6 +17,7 @@ package org.apache.solr.cloud; * limitations under the License. */ +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; @@ -27,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.zookeeper.CreateMode; @@ -77,13 +79,15 @@ public class SliceLeaderElector { * @param collection * @param seq * @param leaderId + * @param props * @throws KeeperException * @throws InterruptedException + * @throws IOException * @throws UnsupportedEncodingException */ private void checkIfIamLeader(final String shardId, final String collection, - final int seq, final String leaderId) throws KeeperException, - InterruptedException { + final int seq, final String leaderId, final ZkNodeProps props) throws KeeperException, + InterruptedException, IOException { // get all other numbers... String holdElectionPath = getElectionPath(shardId, collection) + ELECTION_NODE; @@ -91,7 +95,7 @@ public class SliceLeaderElector { sortSeqs(seqs); List intSeqs = getSeqs(seqs); if (seq <= intSeqs.get(0)) { - runIamLeaderProcess(shardId, collection, leaderId); + runIamLeaderProcess(shardId, collection, leaderId, props); } else { // I am not the leader - watch the node below me int i = 1; @@ -111,7 +115,7 @@ public class SliceLeaderElector { public void process(WatchedEvent event) { // am I the next leader? try { - checkIfIamLeader(shardId, collection, seq, leaderId); + checkIfIamLeader(shardId, collection, seq, leaderId, props); } catch (KeeperException e) { log.warn("", e); @@ -119,6 +123,8 @@ public class SliceLeaderElector { // Restore the interrupted status Thread.currentThread().interrupt(); log.warn("", e); + } catch (IOException e) { + log.warn("", e); } } @@ -126,17 +132,18 @@ public class SliceLeaderElector { } catch (KeeperException e) { // we couldn't set our watch - the node before us may already be down? // we need to check if we are the leader again - checkIfIamLeader(shardId, collection, seq, leaderId); + checkIfIamLeader(shardId, collection, seq, leaderId, props); } } } private void runIamLeaderProcess(final String shardId, - final String collection, final String leaderId) throws KeeperException, - InterruptedException { + final String collection, final String leaderId, ZkNodeProps props) throws KeeperException, + InterruptedException, IOException { String currentLeaderZkPath = getElectionPath(shardId, collection) + LEADER_NODE; - zkClient.makePath(currentLeaderZkPath + "/" + leaderId, CreateMode.EPHEMERAL); + // TODO: leader election tests do not currently set the props + zkClient.makePath(currentLeaderZkPath + "/" + leaderId, props == null ? null : props.store(), CreateMode.EPHEMERAL); } /** @@ -193,14 +200,15 @@ public class SliceLeaderElector { * @param shardId * @param collection * @param shardZkNodeName + * @param props * @return sequential node number * @throws KeeperException * @throws InterruptedException + * @throws IOException * @throws UnsupportedEncodingException */ public int joinElection(String shardId, String collection, - String shardZkNodeName) throws KeeperException, InterruptedException, - UnsupportedEncodingException { + String shardZkNodeName, ZkNodeProps props) throws KeeperException, InterruptedException, IOException { final String shardsElectZkPath = getElectionPath(shardId, collection) + SliceLeaderElector.ELECTION_NODE; @@ -209,6 +217,7 @@ public class SliceLeaderElector { int tries = 0; while (cont) { try { + leaderSeqPath = zkClient.create(shardsElectZkPath + "/n_", null, CreateMode.EPHEMERAL_SEQUENTIAL); cont = false; @@ -224,7 +233,7 @@ public class SliceLeaderElector { } } int seq = getSeq(leaderSeqPath); - checkIfIamLeader(shardId, collection, seq, shardZkNodeName); + checkIfIamLeader(shardId, collection, seq, shardZkNodeName, props); return seq; } Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Oct 9 23:45:59 2011 @@ -19,7 +19,6 @@ package org.apache.solr.cloud; import java.io.File; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.util.Iterator; import java.util.List; @@ -500,6 +499,7 @@ public final class ZkController { if (shardId == null) { shardId = assignShard.assignShard(collection, 3); // nocommit: hard coded // number of slices + cloudDesc.setShardId(shardId); } String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId; @@ -510,10 +510,7 @@ public final class ZkController { + shardUrl); } - ZkNodeProps props = new ZkNodeProps(); - props.put(ZkStateReader.URL_PROP, shardUrl); - - props.put(ZkStateReader.NODE_NAME, getNodeName()); + ZkNodeProps props = getShardZkProps(shardUrl); byte[] bytes = props.store(); @@ -544,15 +541,24 @@ public final class ZkController { } // leader election - doLeaderElectionProcess(shardId, collection, shardZkNodeName); + doLeaderElectionProcess(shardId, collection, shardZkNodeName, props); return shardId; } + + private ZkNodeProps getShardZkProps(String shardUrl) { + ZkNodeProps props = new ZkNodeProps(); + props.put(ZkStateReader.URL_PROP, shardUrl); + + props.put(ZkStateReader.NODE_NAME, getNodeName()); + return props; + } + private void doLeaderElectionProcess(String shardId, - final String collection, String shardZkNodeName) throws KeeperException, - InterruptedException, UnsupportedEncodingException { + final String collection, String shardZkNodeName, ZkNodeProps props) throws KeeperException, + InterruptedException, IOException { - leaderElector.joinElection(shardId, collection, shardZkNodeName); + leaderElector.joinElection(shardId, collection, shardZkNodeName, props); } /** Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Sun Oct 9 23:45:59 2011 @@ -47,7 +47,6 @@ public class CoreDescriptor { this.cloudDesc = new CloudDescriptor(); // cloud collection defaults to core name cloudDesc.setCollectionName(name.isEmpty() ? coreContainer.getDefaultCoreName() : name); - this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name); } if (name == null) { Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1180745&view=auto ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (added) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Oct 9 23:45:59 2011 @@ -0,0 +1,439 @@ +package org.apache.solr.update.processor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; +import org.apache.solr.client.solrj.request.UpdateRequestExt; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.DeleteUpdateCommand; + +// NOT mt-safe... create a new processor for each add thread +public class DistributedUpdateProcessor extends UpdateRequestProcessor { + // TODO: shut this thing down + static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 5, TimeUnit.SECONDS, new SynchronousQueue()); + + static HttpClient client; + + static { + MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); + mgr.getParams().setDefaultMaxConnectionsPerHost(8); + mgr.getParams().setMaxTotalConnections(200); + client = new HttpClient(mgr); + } + + CompletionService completionService; + Set> pending; + + private final SolrQueryRequest req; + private final SolrQueryResponse rsp; + private final UpdateRequestProcessor next;; + private final SchemaField idField; + + private List shards; + private final List[] adds; + private final List[] deletes; + + String selfStr; + int self; + int maxBufferedAddsPerServer = 10; + int maxBufferedDeletesPerServer = 100; + + private DistributedUpdateProcessorFactory factory; + + public DistributedUpdateProcessor(String shardStr, SolrQueryRequest req, + SolrQueryResponse rsp, DistributedUpdateProcessorFactory factory, + UpdateRequestProcessor next) { + super(next); + this.factory = factory; + this.req = req; + this.rsp = rsp; + this.next = next; + this.idField = req.getSchema().getUniqueKeyField(); + + shards = factory.shards; + + String selfStr = req.getParams().get("self", factory.selfStr); + + if (shardStr != null) { + shards = StrUtils.splitSmart(shardStr, ",", true); + } + + self = -1; + if (shards != null) { + for (int i = 0; i < shards.size(); i++) { + if (shards.get(i).equals(selfStr)) { + self = i; + break; + } + } + } + + if (shards == null) { + shards = new ArrayList(1); + shards.add("self"); + self = 0; + } + + adds = new List[shards.size()]; + deletes = new List[shards.size()]; + } + + private int getSlot(String id) { + return (id.hashCode() >>> 1) % shards.size(); + } + + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { + checkResponses(false); + + SolrInputDocument doc = cmd.getSolrInputDocument(); + SolrInputField field = doc.getField(idField.getName()); + if (field == null) { + if (next != null) next.processAdd(cmd); + return; + } + String id = field.getFirstValue().toString(); + int slot = getSlot(id); + if (slot == self) { + if (next != null) next.processAdd(cmd); + return; + } + + // make sure any pending deletes are flushed + flushDeletes(slot, 1, null); + + // TODO: this is brittle + // need to make a clone since these commands may be reused + AddUpdateCommand clone = new AddUpdateCommand(req); + + clone.solrDoc = cmd.solrDoc; + clone.commitWithin = cmd.commitWithin; + clone.overwrite = cmd.overwrite; + + // nocommit: review as far as SOLR-2685 + // clone.indexedId = cmd.indexedId; + // clone.doc = cmd.doc; + + List alist = adds[slot]; + if (alist == null) { + alist = new ArrayList(2); + adds[slot] = alist; + } + alist.add(clone); + + flushAdds(slot, maxBufferedAddsPerServer, null); + } + + // TODO: this is brittle + private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) { + DeleteUpdateCommand c = new DeleteUpdateCommand(req); + c.id = cmd.id; + c.query = cmd.query; + return c; + } + + private void doDelete(int slot, DeleteUpdateCommand cmd) throws IOException { + if (slot == self) { + if (self >= 0) next.processDelete(cmd); + return; + } + + flushAdds(slot, 1, null); + + List dlist = deletes[slot]; + if (dlist == null) { + dlist = new ArrayList(2); + deletes[slot] = dlist; + } + dlist.add(clone(cmd)); + + flushDeletes(slot, maxBufferedDeletesPerServer, null); + } + + @Override + public void processDelete(DeleteUpdateCommand cmd) throws IOException { + checkResponses(false); + + if (cmd.id != null) { + doDelete(getSlot(cmd.id), cmd); + } else if (cmd.query != null) { + // query must be broadcast to all + for (int slot = 0; slot < deletes.length; slot++) { + if (slot == self) continue; + doDelete(slot, cmd); + } + doDelete(self, cmd); + } + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + // Wait for all outstanding repsonses to make sure that a commit + // can't sneak in ahead of adds or deletes we already sent. + // We could do this on a per-server basis, but it's more complex + // and this solution will lead to commits happening closer together. + checkResponses(true); + + for (int slot = 0; slot < shards.size(); slot++) { + if (slot == self) continue; + // piggyback on any outstanding adds or deletes if possible. + if (flushAdds(slot, 1, cmd)) continue; + if (flushDeletes(slot, 1, cmd)) continue; + + UpdateRequestExt ureq = new UpdateRequestExt(); + // pass on version + if (ureq.getParams() == null) { + ureq.setParams(new ModifiableSolrParams()); + } + if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) { + ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION, + req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION)); + } + ureq.getParams().add("update.chain", "distrib-update-chain"); + addCommit(ureq, cmd); + submit(slot, ureq); + } + if (next != null && self >= 0) next.processCommit(cmd); + + // if the command wanted to block until everything was committed, + // then do that here. + // nocommit + if (/* cmd.waitFlush || */cmd.waitSearcher) { + checkResponses(true); + } + } + + @Override + public void finish() throws IOException { + for (int slot = 0; slot < shards.size(); slot++) { + if (slot == self) continue; + // piggyback on any outstanding adds or deletes if possible. + flushAdds(slot, 1, null); + flushDeletes(slot, 1, null); + } + checkResponses(true); + if (next != null && self >= 0) next.finish(); + } + + void checkResponses(boolean block) { + while (pending != null && pending.size() > 0) { + try { + Future future = block ? completionService.take() + : completionService.poll(); + if (future == null) return; + pending.remove(future); + + try { + Request sreq = future.get(); + if (sreq.rspCode != 0) { + // error during request + + // use the first exception encountered + if (rsp.getException() == null) { + Exception e = sreq.exception; + String newMsg = "shard update error (" + sreq.shard + "):" + + e.getMessage(); + if (e instanceof SolrException) { + SolrException se = (SolrException) e; + e = new SolrException(ErrorCode.getErrorCode(se.code()), + newMsg, se.getCause()); + } else { + e = new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "newMsg", e); + } + rsp.setException(e); + } + + SolrException.logOnce(SolrCore.log, "shard update error (" + + sreq.shard + ")", sreq.exception); + } + + } catch (ExecutionException e) { + // shouldn't happen since we catch exceptions ourselves + SolrException.log(SolrCore.log, + "error sending update request to shard", e); + } + + } catch (InterruptedException e) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "interrupted waiting for shard update response", e); + } + } + } + + void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) { + if (cmd == null) return; + // nocommit + ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE + : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher); + } + + boolean flushAdds(int slot, int limit, CommitUpdateCommand ccmd) { + // check for pending deletes + List alist = adds[slot]; + if (alist == null || alist.size() < limit) return false; + + UpdateRequestExt ureq = new UpdateRequestExt(); + // pass on version + if (ureq.getParams() == null) { + ureq.setParams(new ModifiableSolrParams()); + } + if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) { + ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION, + req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION)); + } + ureq.getParams().add("update.chain", "distrib-update-chain"); + addCommit(ureq, ccmd); + + for (AddUpdateCommand cmd : alist) { + ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); + } + + adds[slot] = null; + submit(slot, ureq); + return true; + } + + boolean flushDeletes(int slot, int limit, CommitUpdateCommand ccmd) { + // check for pending deletes + List dlist = deletes[slot]; + if (dlist == null || dlist.size() < limit) return false; + + UpdateRequestExt ureq = new UpdateRequestExt(); + // pass on version + if (ureq.getParams() == null) { + ureq.setParams(new ModifiableSolrParams()); + } + if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) { + ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION, + req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION)); + } + ureq.getParams().add("update.chain", "distrib-update-chain"); + addCommit(ureq, ccmd); + for (DeleteUpdateCommand cmd : dlist) { + if (cmd.id != null) { + ureq.deleteById(cmd.id); + } + if (cmd.query != null) { + ureq.deleteByQuery(cmd.query); + } + } + + deletes[slot] = null; + submit(slot, ureq); + return true; + } + + static class Request { + String shard; + UpdateRequestExt ureq; + NamedList ursp; + int rspCode; + Exception exception; + } + + void submit(int slot, UpdateRequestExt ureq) { + Request sreq = new Request(); + sreq.shard = shards.get(slot); + sreq.ureq = ureq; + submit(sreq); + } + + void submit(final Request sreq) { + if (completionService == null) { + completionService = new ExecutorCompletionService(commExecutor); + pending = new HashSet>(); + } + String[] shards; + // look to see if we should send to multiple servers + if (sreq.shard.contains("|")) { + shards = sreq.shard.split("\\|"); + } else { + shards = new String[1]; + shards[0] = sreq.shard; + } + for (final String shard : shards) { + // TODO: when we break up shards, we might forward + // to self again - makes things simple here, but we could + // also have realized this before, done the req locally, and + // removed self from this list. + + Callable task = new Callable() { + @Override + public Request call() throws Exception { + + try { + String url; + if (!shard.startsWith("http://")) { + url = "http://" + sreq.shard; + } else { + url = shard; + } + + // TODO: allow shard syntax to use : to specify replicas + SolrServer server = new CommonsHttpSolrServer(url, client); + sreq.ursp = server.request(sreq.ureq); + + // currently no way to get the request body. + } catch (Exception e) { + sreq.exception = e; + if (e instanceof SolrException) { + sreq.rspCode = ((SolrException) e).code(); + } else { + sreq.rspCode = -1; + } + } + return sreq; + } + }; + + pending.add(completionService.submit(task)); + } + } +} Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1180745&view=auto ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (added) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Sun Oct 9 23:45:59 2011 @@ -0,0 +1,165 @@ +package org.apache.solr.update.processor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.CloudState; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.zookeeper.KeeperException; + +public class DistributedUpdateProcessorFactory extends + UpdateRequestProcessorFactory { + public static final String DOCVERSION = "docversion"; + NamedList args; + List shards; + String selfStr; + String shardsString; + + @Override + public void init(NamedList args) { + selfStr = (String) args.get("self"); + Object o = args.get("shards"); + if (o != null && o instanceof List) { + shards = (List) o; + shardsString = StrUtils.join((List) o, ','); + } else if (o != null && o instanceof String) { + shards = StrUtils.splitSmart((String) o, ",", true); + shardsString = (String) o; + } + } + + /** return the list of shards, or null if not configured */ + public List getShards() { + return shards; + } + + public String getShardsString() { + return shardsString; + } + + /** return "self", or null if not configured */ + public String getSelf() { + return selfStr; + } + + @Override + public DistributedUpdateProcessor getInstance(SolrQueryRequest req, + SolrQueryResponse rsp, UpdateRequestProcessor next) { + CoreDescriptor coreDesc = req.getCore().getCoreDescriptor(); + + // TODO: could do this here, or in a previous update processor. + // if we are in zk mode... + if (coreDesc.getCoreContainer().getZkController() != null) { + // the leader is... + // TODO: if there is no leader, wait and look again + // TODO: we are reading the leader from zk every time - we should cache + // this + // and watch for changes + List leaderChildren; + String collection = coreDesc.getCloudDescriptor().getCollectionName(); + String shardId = coreDesc.getCloudDescriptor().getShardId(); + ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); + String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader"; + SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController() + .getZkClient(); + try { + leaderChildren = zkClient.getChildren(leaderNode, null); + if (leaderChildren.size() > 0) { + String leader = leaderChildren.get(0); + ZkNodeProps zkNodeProps = new ZkNodeProps(); + byte[] bytes = zkClient + .getData(leaderNode + "/" + leader, null, null); + zkNodeProps.load(bytes); + String leaderUrl = zkNodeProps.get("url"); + + String nodeName = req.getCore().getCoreDescriptor() + .getCoreContainer().getZkController().getNodeName(); + String shardZkNodeName = nodeName + "_" + req.getCore().getName(); + + if (params.get(DOCVERSION) != null + && params.get(DOCVERSION).equals("yes")) { + // we got a version, just go local + } else if (shardZkNodeName.equals(leader)) { + // that means I want to forward onto my replicas... + + // so get the replicas... + CloudState cloudState = req.getCore().getCoreDescriptor() + .getCoreContainer().getZkController().getCloudState(); + Slice replicas = cloudState.getSlices(collection).get(shardId); + Map shardMap = replicas.getShards(); + String self = null; + StringBuilder replicasUrl = new StringBuilder(); + for (Entry entry : shardMap.entrySet()) { + if (replicasUrl.length() > 0) { + replicasUrl.append("|"); + } + String replicaUrl = entry.getValue().get("url"); + if (shardZkNodeName.equals(entry.getKey())) { + self = replicaUrl; + } + replicasUrl.append(replicaUrl); + } + versionDoc(params); + params.add("self", self); + params.add("shards", replicasUrl.toString()); + } else { + // I need to forward onto the leader... + // TODO: don't use leader - we need to get the real URL from the zk + // node + params.add("shards", leaderUrl); + } + req.setParams(params); + } + } catch (KeeperException e) { + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", + e); + } catch (IOException e) { + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", + e); + } + } + + String shardStr = req.getParams().get("shards"); + if (shards == null && shardStr == null) return null; + return new DistributedUpdateProcessor(shardStr, req, rsp, this, next); + } + + private void versionDoc(ModifiableSolrParams params) { + params.set(DOCVERSION, "yes"); + } +} Added: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml?rev=1180745&view=auto ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml (added) +++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml Sun Oct 9 23:45:59 2011 @@ -0,0 +1,52 @@ + + + + + + + + ${tests.luceneMatchVersion:LUCENE_CURRENT} + ${solr.data.dir:} + + + + + + + + + + + + + + + + + + + + + 10 + + + + + Modified: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml Sun Oct 9 23:45:59 2011 @@ -30,8 +30,11 @@ solr.RAMDirectoryFactory is memory based and not persistent. --> + ${solr.data.dir:} + + Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java Sun Oct 9 23:45:59 2011 @@ -45,10 +45,9 @@ public class TestSolrCoreProperties exte public void setUp() throws Exception { super.setUp(); setUpMe(); - System.setProperty("solr.solr.home", getHomeDir()); System.setProperty("solr.data.dir", getDataDir()); - solrJetty = new JettySolrRunner("/solr", 0); + solrJetty = new JettySolrRunner(getHomeDir(), "/solr", 0); solrJetty.start(); String url = "http://localhost:" + solrJetty.getLocalPort() + "/solr"; Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun Oct 9 23:45:59 2011 @@ -84,6 +84,7 @@ public abstract class AbstractZkTestCase putConfig(zkClient, config); putConfig(zkClient, schema); + putConfig(zkClient, "solrconfig-distrib-update.xml"); putConfig(zkClient, "stopwords.txt"); putConfig(zkClient, "protwords.txt"); putConfig(zkClient, "mapping-ISOLatin1Accent.txt"); Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Sun Oct 9 23:45:59 2011 @@ -19,13 +19,11 @@ package org.apache.solr.cloud; import java.net.MalformedURLException; -import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; -import org.junit.BeforeClass; /** * @@ -56,12 +54,6 @@ public class BasicDistributedZkTest exte System.setProperty("CLOUD_UPDATE_DELAY", "0"); } - - - @BeforeClass - public static void beforeClass() throws Exception { - System.setProperty("solr.solr.home", SolrTestCaseJ4.TEST_HOME()); - } @Override protected void setDistributedParams(ModifiableSolrParams params) { Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Sun Oct 9 23:45:59 2011 @@ -243,18 +243,18 @@ public class CloudStateUpdateTest extend cloudState2 = zkController2.getCloudState(); slices = cloudState2.getSlices("testcore"); - if (slices != null && slices.containsKey(host + ":1661_solr_testcore") - && slices.get(host + ":1661_solr_testcore").getShards().size() > 0) { + if (slices != null && slices.containsKey("shard1") + && slices.get("shard1").getShards().size() > 0) { break; } Thread.sleep(500); } assertNotNull(slices); - assertTrue(slices.containsKey(host + ":1661_solr_testcore")); + assertTrue(slices.containsKey("shard1")); - Slice slice = slices.get(host + ":1661_solr_testcore"); - assertEquals(host + ":1661_solr_testcore", slice.getName()); + Slice slice = slices.get("shard1"); + assertEquals("shard1", slice.getName()); Map shards = slice.getShards(); Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1180745&view=auto ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (added) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sun Oct 9 23:45:59 2011 @@ -0,0 +1,333 @@ +package org.apache.solr.cloud; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.net.MalformedURLException; + +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrServer; +import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; + +/** + * + */ +public class FullDistributedZkTest extends AbstractDistributedZkTestCase { + + private static final String DEFAULT_COLLECTION = "collection1"; + private static final boolean DEBUG = false; + String t1="a_t"; + String i1="a_si"; + String nint = "n_i"; + String tint = "n_ti"; + String nfloat = "n_f"; + String tfloat = "n_tf"; + String ndouble = "n_d"; + String tdouble = "n_td"; + String nlong = "n_l"; + String tlong = "other_tl1"; + String ndate = "n_dt"; + String tdate = "n_tdt"; + + String oddField="oddField_s"; + String missingField="ignore_exception__missing_but_valid_field_t"; + String invalidField="ignore_exception__invalid_field_not_in_schema"; + + public FullDistributedZkTest() { + fixShardCount = true; + shardCount = 6; + System.setProperty("CLOUD_UPDATE_DELAY", "0"); + } + + @Override + protected void createServers(int numShards) throws Exception { + System.setProperty("collection", "control_collection"); + controlJetty = createJetty(testDir, testDir + "/control/data", "control_shard"); + System.clearProperty("collection"); + controlClient = createNewSolrServer(controlJetty.getLocalPort()); + + StringBuilder sb = new StringBuilder(); + for (int i = 1; i <= numShards; i++) { + if (sb.length() > 0) sb.append(','); + JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml"); + jettys.add(j); + clients.add(createNewSolrServer(j.getLocalPort())); + + } + + // build the shard string + for (int i = 1; i <= numShards/2; i++) { + JettySolrRunner j = jettys.get(i); + JettySolrRunner j2 = jettys.get(i + (numShards/2 - 1)); + if (sb.length() > 0) sb.append(','); + sb.append("localhost:").append(j.getLocalPort()).append(context); + sb.append("|localhost:").append(j2.getLocalPort()).append(context); + } + shards = sb.toString(); + } + + @Override + protected void setDistributedParams(ModifiableSolrParams params) { + + if (r.nextBoolean()) { + // don't set shards, let that be figured out from the cloud state + params.set("distrib", "true"); + } else { + // use shard ids rather than physical locations + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < shardCount / 2; i++) { + if (i > 0) + sb.append(','); + sb.append("shard" + (i+1)); + } + params.set("shards", sb.toString()); + params.set("distrib", "true"); + } + } + + @Override + protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException { + controlClient.add(doc); + + boolean pick = random.nextBoolean(); + + int mod = (clients.size() / 2); + + int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % mod; + + if (pick) { + which = which + mod; + } + + CommonsHttpSolrServer client = (CommonsHttpSolrServer) clients.get(which); + + UpdateRequest ureq = new UpdateRequest(); + ureq.add(doc); + ureq.setParam("update.chain", "distrib-update-chain"); + ureq.process(client); + } + + /* (non-Javadoc) + * @see org.apache.solr.BaseDistributedSearchTestCase#doTest() + * + * Create 3 shards, each with one replica + */ + @Override + public void doTest() throws Exception { + printLayout(); + // make sure 'shard1' was auto-assigned + SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT); + assertTrue("shard1 was not found in zk layout", zkClient.exists("/solr/collections/collection1/shards/shard1")); + zkClient.close(); + + del("*:*"); + indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men" + ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d); + indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country." + ); + indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow" + ); + indexr(id,4, i1, -100 ,tlong, 101,t1,"the quick fox jumped over the lazy dog" + ); + indexr(id,5, i1, 500, tlong, 500 ,t1,"the quick fox jumped way over the lazy dog" + ); + indexr(id,6, i1, -600, tlong, 600 ,t1,"humpty dumpy sat on a wall"); + indexr(id,7, i1, 123, tlong, 123 ,t1,"humpty dumpy had a great fall"); + indexr(id,8, i1, 876, tlong, 876,t1,"all the kings horses and all the kings men"); + indexr(id,9, i1, 7, tlong, 7,t1,"couldn't put humpty together again"); + indexr(id,10, i1, 4321, tlong, 4321,t1,"this too shall pass"); + indexr(id,11, i1, -987, tlong, 987,t1,"An eye for eye only ends up making the whole world blind."); + indexr(id,12, i1, 379, tlong, 379,t1,"Great works are performed, not by strength, but by perseverance."); + indexr(id,13, i1, 232, tlong, 232,t1,"no eggs on wall, lesson learned", oddField, "odd man out"); + + indexr(id, 14, "SubjectTerms_mfacet", new String[] {"mathematical models", "mathematical analysis"}); + indexr(id, 15, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"}); + indexr(id, 16, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"}); + String[] vals = new String[100]; + for (int i=0; i<100; i++) { + vals[i] = "test " + i; + } + indexr(id, 17, "SubjectTerms_mfacet", vals); + + for (int i=100; i<150; i++) { + indexr(id, i); + } + + commit(); + + handle.clear(); + handle.put("QTime", SKIPVAL); + handle.put("timestamp", SKIPVAL); + + // random value sort + for (String f : fieldNames) { + query("q","*:*", "sort",f+" desc"); + query("q","*:*", "sort",f+" asc"); + } + + // these queries should be exactly ordered and scores should exactly match + query("q","*:*", "sort",i1+" desc"); + query("q","*:*", "sort",i1+" asc"); + query("q","*:*", "sort",i1+" desc", "fl","*,score"); + query("q","*:*", "sort","n_tl1 asc", "fl","score"); // test legacy behavior - "score"=="*,score" + query("q","*:*", "sort","n_tl1 desc"); + handle.put("maxScore", SKIPVAL); + query("q","{!func}"+i1);// does not expect maxScore. So if it comes ,ignore it. JavaBinCodec.writeSolrDocumentList() + //is agnostic of request params. + handle.remove("maxScore"); + query("q","{!func}"+i1, "fl","*,score"); // even scores should match exactly here + + handle.put("highlighting", UNORDERED); + handle.put("response", UNORDERED); + + handle.put("maxScore", SKIPVAL); + query("q","quick"); + query("q","all","fl","id","start","0"); + query("q","all","fl","foofoofoo","start","0"); // no fields in returned docs + query("q","all","fl","id","start","100"); + + handle.put("score", SKIPVAL); + query("q","quick","fl","*,score"); + query("q","all","fl","*,score","start","1"); + query("q","all","fl","*,score","start","100"); + + query("q","now their fox sat had put","fl","*,score", + "hl","true","hl.fl",t1); + + query("q","now their fox sat had put","fl","foofoofoo", + "hl","true","hl.fl",t1); + + query("q","matchesnothing","fl","*,score"); + + query("q","*:*", "rows",100, "facet","true", "facet.field",t1); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","count"); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","count", "facet.mincount",2); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","index"); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","index", "facet.mincount",2); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1,"facet.limit",1); + query("q","*:*", "rows",100, "facet","true", "facet.query","quick", "facet.query","all", "facet.query","*:*"); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.offset",1); + query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.mincount",2); + + // test faceting multiple things at once + query("q","*:*", "rows",100, "facet","true", "facet.query","quick", "facet.query","all", "facet.query","*:*" + ,"facet.field",t1); + + // test filter tagging, facet exclusion, and naming (multi-select facet support) + query("q","*:*", "rows",100, "facet","true", "facet.query","{!key=myquick}quick", "facet.query","{!key=myall ex=a}all", "facet.query","*:*" + ,"facet.field","{!key=mykey ex=a}"+t1 + ,"facet.field","{!key=other ex=b}"+t1 + ,"facet.field","{!key=again ex=a,b}"+t1 + ,"facet.field",t1 + ,"fq","{!tag=a}id:[1 TO 7]", "fq","{!tag=b}id:[3 TO 9]" + ); + query("q", "*:*", "facet", "true", "facet.field", "{!ex=t1}SubjectTerms_mfacet", "fq", "{!tag=t1}SubjectTerms_mfacet:(test 1)", "facet.limit", "10", "facet.mincount", "1"); + + // test field that is valid in schema but missing in all shards + query("q","*:*", "rows",100, "facet","true", "facet.field",missingField, "facet.mincount",2); + // test field that is valid in schema and missing in some shards + query("q","*:*", "rows",100, "facet","true", "facet.field",oddField, "facet.mincount",2); + + query("q","*:*", "sort",i1+" desc", "stats", "true", "stats.field", i1); + + + // Try to get better coverage for refinement queries by turning off over requesting. + // This makes it much more likely that we may not get the top facet values and hence + // we turn of that checking. + handle.put("facet_fields", SKIPVAL); + query("q","*:*", "rows",0, "facet","true", "facet.field",t1,"facet.limit",5, "facet.shard.limit",5); + // check a complex key name + query("q","*:*", "rows",0, "facet","true", "facet.field","{!key='a b/c \\' \\} foo'}"+t1,"facet.limit",5, "facet.shard.limit",5); + handle.remove("facet_fields"); + + + // index the same document to two servers and make sure things + // don't blow up. + if (clients.size()>=2) { + index(id,100, i1, 107 ,t1,"oh no, a duplicate!"); + for (int i=0; i threads = new ArrayList(); Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Oct 9 23:45:59 2011 @@ -260,11 +260,17 @@ public class ZkControllerTest extends So zkController.createCollectionZkNode(cloudDesc); String shard1 = zkController.register("core1", cloudDesc); + cloudDesc.setShardId(null); String shard2 = zkController.register("core2", cloudDesc); + cloudDesc.setShardId(null); String shard3 = zkController.register("core3", cloudDesc); + cloudDesc.setShardId(null); String shard4 = zkController.register("core4", cloudDesc); + cloudDesc.setShardId(null); String shard5 = zkController.register("core5", cloudDesc); + cloudDesc.setShardId(null); String shard6 = zkController.register("core6", cloudDesc); + cloudDesc.setShardId(null); assertEquals("shard1", shard1); assertEquals("shard2", shard2); Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java Sun Oct 9 23:45:59 2011 @@ -105,10 +105,9 @@ public class TestReplicationHandler exte } private static JettySolrRunner createJetty(SolrInstance instance) throws Exception { - System.setProperty("solr.solr.home", instance.getHomeDir()); System.setProperty("solr.data.dir", instance.getDataDir()); - JettySolrRunner jetty = new JettySolrRunner("/solr", 0); + JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0); jetty.start(); return jetty; Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java Sun Oct 9 23:45:59 2011 @@ -67,12 +67,11 @@ public class TestBinaryField extends Luc out = new FileOutputStream(f); IOUtils.copy(loader.openResource(fname), out); out.close(); - System.setProperty("solr.solr.home", homeDir.getAbsolutePath()); System.setProperty("solr.data.dir", dataDir.getAbsolutePath()); System.setProperty("solr.test.sys.prop1", "propone"); System.setProperty("solr.test.sys.prop2", "proptwo"); - jetty = new JettySolrRunner(context, 0); + jetty = new JettySolrRunner(homeDir.getAbsolutePath(), context, 0); jetty.start(); port = jetty.getLocalPort(); Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java?rev=1180745&view=auto ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java (added) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java Sun Oct 9 23:45:59 2011 @@ -0,0 +1,369 @@ +package org.apache.solr.update.processor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.request.UpdateRequestExt; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; + +public class TestDistributedUpdate extends SolrTestCaseJ4 { + + private static final int NUM_JETTIES = 2; + + File testDir; + + List clients = new ArrayList(); + List jettys = new ArrayList(); + String context = "/solr/collection1"; + String shardStr; + String[] shards; + boolean updateSelf; + + String id = "id"; + String t1 = "a_t"; + String i1 = "a_i"; + String oddField = "oddField_s"; + String missingField = "missing_but_valid_field_t"; + String invalidField = "invalid_field_not_in_schema"; + + @Override + public void setUp() throws Exception { + super.setUp(); + System.setProperty("solr.test.sys.prop1", "propone"); + System.setProperty("solr.test.sys.prop2", "proptwo"); + testDir = new File(TEMP_DIR, "distrib_update_test"); + testDir.mkdirs(); + } + + @Override + public void tearDown() throws Exception { + destroyServers(); + super.tearDown(); + } + + private void createServers() throws Exception { + StringBuilder sb = new StringBuilder(); + shards = new String[NUM_JETTIES]; + for (int i = 0; i < NUM_JETTIES; i++) { + + if (sb.length() > 0) sb.append(','); + JettySolrRunner jetty = createJetty(testDir, testDir + "/shard" + i + + "/data", "solrconfig-distrib-update.xml"); + jettys.add(jetty); + int port = jetty.getLocalPort(); + clients.add(createNewSolrServer(port)); + shards[i] = "localhost:" + port + context; + sb.append(shards[i]); + } + + shardStr = sb.toString(); + + // Assure that Solr starts with no documents + send(commit(u().deleteByQuery("*:*"))); + } + + private void destroyServers() throws Exception { + for (JettySolrRunner jetty : jettys) + jetty.stop(); + clients.clear(); + jettys.clear(); + } + + public JettySolrRunner createJetty(File baseDir, String dataDir) + throws Exception { + return createJetty(baseDir, dataDir, null, null); + } + + public JettySolrRunner createJetty(File baseDir, String dataDir, + String shardId) throws Exception { + return createJetty(baseDir, dataDir, shardId, + "solrconfig-distrib-update.xml"); + } + + public JettySolrRunner createJetty(File baseDir, String dataDir, + String shardList, String solrConfigOverride) throws Exception { + System.setProperty("solr.data.dir", dataDir); + + JettySolrRunner jetty = new JettySolrRunner(TEST_HOME(), "/solr", 0, solrConfigOverride); + if (shardList != null) { + System.setProperty("shard", shardList); + } + jetty.start(); + System.clearProperty("shard"); + return jetty; + } + + protected SolrServer createNewSolrServer(int port) { + try { + // setup the server... + String url = "http://localhost:" + port + context; + CommonsHttpSolrServer s = new CommonsHttpSolrServer(url); + s.setConnectionTimeout(1000); // 1 sec + s.setDefaultMaxConnectionsPerHost(100); + s.setMaxTotalConnections(100); + return s; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + QueryResponse query(Object... q) throws Exception { + ModifiableSolrParams params = new ModifiableSolrParams(); + + for (int i = 0; i < q.length; i += 2) { + params.add(q[i].toString(), q[i + 1].toString()); + } + + params.set("shards", shardStr); + + // query a random server + int which = random.nextInt(clients.size()); + SolrServer client = clients.get(which); + QueryResponse rsp = client.query(params); + return rsp; + } + + void send(int which, AbstractUpdateRequest ureq) throws Exception { + ureq.setParam("update.chain", "distrib-update-chain"); + ureq.setParam("shards", shardStr); + ureq.setParam("self", updateSelf ? shards[which] : "foo"); + + SolrServer client = clients.get(which); + client.request(ureq); + } + + SolrInputDocument doc(Object... fields) { + SolrInputDocument doc = new SolrInputDocument(); + for (int i = 0; i < fields.length; i += 2) { + doc.addField((String) (fields[i]), fields[i + 1]); + } + return doc; + } + + // send request to a random server + void send(AbstractUpdateRequest ureq) throws Exception { + send((random.nextInt() >>> 1) % shards.length, ureq); + } + + UpdateRequest u() { + return new UpdateRequest(); + } + + AbstractUpdateRequest commit(AbstractUpdateRequest ureq) { + ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true); + return ureq; + } + + UpdateRequest optimize(UpdateRequest ureq) { + ureq.setAction(UpdateRequest.ACTION.OPTIMIZE, true, true); + return ureq; + } + + UpdateRequest add(UpdateRequest ureq, Object... ids) { + for (Object id : ids) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", id.toString()); + ureq.add(doc); + } + return ureq; + } + + void verifyCount(String q, int count) throws Exception { + verifyCount(q, count, 0); + } + + void verifyCount(String q, int count, int retries) throws Exception { + long found = query("q", q).getResults().getNumFound(); + for (int i = 0; i < retries; i++) { + if (found == count) { + break; + } + Thread.sleep(500); + found = query("q", q).getResults().getNumFound(); + } + + assertEquals(count, found); + // use a facet to get the "real" count since distributed search + // can do some dedup for us. + assertEquals(count, query("q", "*:*", "facet", "true", "facet.query", q) + .getFacetQuery().get(q).longValue()); + } + + public void testStress() throws Exception { + int iter = 10; // crank this number up for a long term test + + createServers(); + updateSelf = true; + + List docs = new ArrayList(1000); + for (int i = 0; i < 1000; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "" + i); + docs.add(doc); + } + + List ids = new ArrayList(1000); + for (int i = 0; i < 1000; i++) + ids.add("" + i); + + boolean haveAddedDocs = false; + for (int i = 0; i < iter; i++) { + // System.out.println("ITERATION"+i); + if ((random.nextInt() & 0x01) == 0) { + // System.out.println("ITERATION"+i+" - 0"); + haveAddedDocs = true; + UpdateRequest addReq = new UpdateRequest(); + addReq.add(docs); + send(commit(addReq)); + verifyCount("id:[* TO *]", 1000); + } else { + // System.out.println("ITERATION"+i+" - 1"); + UpdateRequest delReq = new UpdateRequest(); + for (int j = 0; j < 1000; j += 2) { + delReq.deleteById(ids.get(j)); + } + send(commit(delReq)); + verifyCount("id:[* TO *]", (haveAddedDocs ? 500 : 0)); + } + + // optimize to keep the index size under control. + if (i % 25 == 0) { + send(optimize(u())); + } + + } + + destroyServers(); + } + + public void testDistribUpdate() throws Exception { + for (int nServers = 2; nServers < 4; nServers++) { + + createServers(); + + // node doesn't know who it is... sends to itself over HTTP + updateSelf = false; + doTest(); + + // node does know who it is... updates index directly for itself + updateSelf = true; + doTest(); + + destroyServers(); + } + } + + public void doTest() throws Exception { + send(0, commit(u().deleteByQuery("*:*"))); + verifyCount("id:1", 0); + + send(0, add(u(), 1)); + send(1, add(u(), 1)); + verifyCount("id:1", 0); // no commit yet + send(commit(u())); + verifyCount("id:1", 1); // doc should only have been sent to single server + + send(u().deleteById("1")); + verifyCount("id:1", 1); // no commit yet + send(commit(u())); + verifyCount("id:1", 0); + + // test adding a commit onto an add + send(commit(add(u(), 1))); + verifyCount("id:1", 1); + + // test adding a commmit onto a delete + send(commit(u().deleteById("1"))); + verifyCount("id:1", 0); + + // test that batching adds doesn't mess anything up + send(add(u(), 1, 2, 3, 4, 5, 6, 7, 8, 9)); + send(commit(u())); + // Thread.sleep(1000000000); + verifyCount("id:[1 TO 9]", 9); + + // test delete by query + send(commit(u().deleteByQuery("id:[2 TO 8]"))); + verifyCount("id:[1 TO 9]", 2); + + send(commit(add(u(), 1, 2, 3, 4, 5, 6, 7, 8, 9))); + verifyCount("id:[1 TO 9]", 9); + + send(commit(u().deleteByQuery("*:*"))); + verifyCount("id:[1 TO 9]", 0); + + // this test can cause failures if a commit can sneak in ahead of + // add requests that are still pending. + Object[] docs = new Object[1000]; + for (int i = 0; i < 1000; i++) + docs[i] = i; + send(commit(add(u(), docs))); + verifyCount("id:[* TO *]", 1000); + + // test delete batching + UpdateRequest ureq = u(); + for (int i = 0; i < 1000; i += 2) { + ureq.deleteById("" + i); + } + send(commit(ureq)); + verifyCount("id:[* TO *]", 500); + + // test commit within + ureq = u(); + ureq.setCommitWithin(1); + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "999999"); + ureq.add(doc); + send(ureq); + + + verifyCount("id:[* TO *]", 501, 300); + + send(commit(ureq)); + + // test overwrite + UpdateRequestExt lweureq = new UpdateRequestExt(); + doc = new SolrInputDocument(); + doc.addField("id", "999999"); + lweureq.add(doc, 3, false); + send(commit(lweureq)); + + verifyCount("id:[* TO *]", 502); + + // test overwrite with no commitWithin + lweureq = new UpdateRequestExt(); + doc = new SolrInputDocument(); + doc.addField("id", "999999"); + lweureq.add(doc, -1, false); + send(commit(lweureq)); + + verifyCount("id:[* TO *]", 503); + } + +} Added: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java?rev=1180745&view=auto ============================================================================== --- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java (added) +++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java Sun Oct 9 23:45:59 2011 @@ -0,0 +1,234 @@ +package org.apache.solr.client.solrj.request; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.XML; + +// TODO: bake this into UpdateRequest +public class UpdateRequestExt extends AbstractUpdateRequest { + + private List documents = null; + private List deleteById = null; + private List deleteQuery = null; + + private class SolrDoc { + @Override + public String toString() { + return "SolrDoc [document=" + document + ", commitWithin=" + commitWithin + + ", overwrite=" + overwrite + "]"; + } + SolrInputDocument document; + int commitWithin; + boolean overwrite; + } + + public UpdateRequestExt() { + super(METHOD.POST, "/update"); + } + + public UpdateRequestExt(String url) { + super(METHOD.POST, url); + } + + // --------------------------------------------------------------------------- + // --------------------------------------------------------------------------- + + /** + * clear the pending documents and delete commands + */ + public void clear() { + if (documents != null) { + documents.clear(); + } + if (deleteById != null) { + deleteById.clear(); + } + if (deleteQuery != null) { + deleteQuery.clear(); + } + } + + // --------------------------------------------------------------------------- + // --------------------------------------------------------------------------- + + public UpdateRequestExt add(final SolrInputDocument doc) { + if (documents == null) { + documents = new ArrayList(2); + } + SolrDoc solrDoc = new SolrDoc(); + solrDoc.document = doc; + solrDoc.commitWithin = -1; + solrDoc.overwrite = true; + documents.add(solrDoc); + + return this; + } + + public UpdateRequestExt add(final SolrInputDocument doc, int commitWithin, + boolean overwrite) { + if (documents == null) { + documents = new ArrayList(2); + } + SolrDoc solrDoc = new SolrDoc(); + solrDoc.document = doc; + solrDoc.commitWithin = commitWithin; + solrDoc.overwrite = overwrite; + documents.add(solrDoc); + + return this; + } + + public UpdateRequestExt deleteById(String id) { + if (deleteById == null) { + deleteById = new ArrayList(); + } + deleteById.add(id); + return this; + } + + public UpdateRequestExt deleteById(List ids) { + if (deleteById == null) { + deleteById = new ArrayList(ids); + } else { + deleteById.addAll(ids); + } + return this; + } + + public UpdateRequestExt deleteByQuery(String q) { + if (deleteQuery == null) { + deleteQuery = new ArrayList(); + } + deleteQuery.add(q); + return this; + } + + // -------------------------------------------------------------------------- + // -------------------------------------------------------------------------- + + @Override + public Collection getContentStreams() throws IOException { + return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML); + } + + public String getXML() throws IOException { + StringWriter writer = new StringWriter(); + writeXML(writer); + writer.flush(); + + String xml = writer.toString(); + + return (xml.length() > 0) ? xml : null; + } + + public void writeXML(Writer writer) throws IOException { + List> getDocLists = getDocLists(documents); + + for (List docs : getDocLists) { + + if ((docs != null && docs.size() > 0)) { + SolrDoc firstDoc = docs.get(0); + int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin; + boolean overwrite = firstDoc.overwrite; + if (commitWithin > -1 || overwrite != true) { + writer.write(""); + } else { + writer.write(""); + } + if (documents != null) { + for (SolrDoc doc : documents) { + if (doc != null) { + ClientUtils.writeXML(doc.document, writer); + } + } + } + + writer.write(""); + } + } + + // Add the delete commands + boolean deleteI = deleteById != null && deleteById.size() > 0; + boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0; + if (deleteI || deleteQ) { + writer.append(""); + if (deleteI) { + for (String id : deleteById) { + writer.append(""); + XML.escapeCharData(id, writer); + writer.append(""); + } + } + if (deleteQ) { + for (String q : deleteQuery) { + writer.append(""); + XML.escapeCharData(q, writer); + writer.append(""); + } + } + writer.append(""); + } + } + + private List> getDocLists(List documents) { + List> docLists = new ArrayList>(); + if (this.documents == null) { + return docLists; + } + boolean lastOverwrite = true; + int lastCommitWithin = -1; + List docList = null; + for (SolrDoc doc : this.documents) { + if (doc.overwrite != lastOverwrite + || doc.commitWithin != lastCommitWithin || docLists.size() == 0) { + docList = new ArrayList(); + docLists.add(docList); + } + docList.add(doc); + lastCommitWithin = doc.commitWithin; + lastOverwrite = doc.overwrite; + } + + return docLists; + } + + public List getDeleteById() { + return deleteById; + } + + public List getDeleteQuery() { + return deleteQuery; + } + + @Override + public String toString() { + return "UpdateRequestExt [documents=" + documents + ", deleteById=" + + deleteById + ", deleteQuery=" + deleteQuery + "]"; + } + +} Modified: lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java (original) +++ lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java Sun Oct 9 23:45:59 2011 @@ -258,8 +258,7 @@ public class TestLBHttpSolrServer extend } public void startJetty() throws Exception { - jetty = new JettySolrRunner("/solr", port); - System.setProperty("solr.solr.home", getHomeDir()); + jetty = new JettySolrRunner(getHomeDir(), "/solr", port); System.setProperty("solr.data.dir", getDataDir()); jetty.start(); int newPort = jetty.getLocalPort(); Modified: lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java (original) +++ lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java Sun Oct 9 23:45:59 2011 @@ -53,7 +53,7 @@ public class MultiCoreExampleJettyTest e System.clearProperty("solr.directoryFactory"); super.setUp(); - jetty = new JettySolrRunner( context, 0 ); + jetty = new JettySolrRunner(getSolrHome(), context, 0 ); jetty.start(false); port = jetty.getLocalPort();