Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA12193C7 for ; Mon, 14 Nov 2011 13:51:24 +0000 (UTC) Received: (qmail 76379 invoked by uid 500); 14 Nov 2011 13:51:24 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 76372 invoked by uid 99); 14 Nov 2011 13:51:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Nov 2011 13:51:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Nov 2011 13:51:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3F9B32388860; Mon, 14 Nov 2011 13:50:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1201704 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/client/solrj/impl/ test-framework/sr... Date: Mon, 14 Nov 2011 13:50:57 -0000 To: commits@lucene.apache.org From: markrmiller@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111114135058.3F9B32388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: markrmiller Date: Mon Nov 14 13:50:57 2011 New Revision: 1201704 URL: http://svn.apache.org/viewvc?rev=1201704&view=rev Log: push on FullDistributedZkTest and tweaks/fixes around that Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1201704&r1=1201703&r2=1201704&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Nov 14 13:50:57 2011 @@ -138,8 +138,6 @@ public final class ZkController { public void command() { try { // we need to create all of our lost watches -// zkStateReader.makeCollectionsNodeWatches(); -// zkStateReader.makeShardsWatches(true); createEphemeralLiveNode(); zkStateReader.createClusterStateWatchersAndUpdate(); @@ -486,7 +484,12 @@ public final class ZkController { log.info("Attempting to update /clusterstate version " + stat.getVersion()); CloudState state = CloudState.load(data); - + Map slices = state.getSlices(cloudDesc.getCollectionName()); + if (slices != null && slices.containsKey(shardZkNodeName)) { + // TODO: we where already registered - go into recovery mode + System.out.println("RECOVERY"); + } + state.addSlice(cloudDesc.getCollectionName(), slice); try { Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1201704&r1=1201703&r2=1201704&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Nov 14 13:50:57 2011 @@ -681,7 +681,6 @@ public class DistributedUpdateProcessor url = shard; } - // TODO: allow shard syntax to use : to specify replicas SolrServer server = new CommonsHttpSolrServer(url, client); sreq.ursp = server.request(sreq.ureq); Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1201704&r1=1201703&r2=1201704&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Mon Nov 14 13:50:57 2011 @@ -176,7 +176,8 @@ public class DistributedUpdateProcessorF replicasUrl.append(replicaUrl); } - params.add("self", self); + // we don't currently use self - it does not yet work with the | notation anyhow + //params.add("self", self); params.add("shards", replicasUrl.toString()); } Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1201704&r1=1201703&r2=1201704&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original) +++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Mon Nov 14 13:50:57 2011 @@ -19,7 +19,13 @@ package org.apache.solr.cloud; import java.io.IOException; import java.net.MalformedURLException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettySolrRunner; @@ -28,7 +34,9 @@ import org.apache.solr.client.solrj.impl import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.BeforeClass; @@ -58,6 +66,11 @@ public class FullDistributedZkTest exten String invalidField="ignore_exception__invalid_field_not_in_schema"; private static final int sliceCount = 3; + + protected Map clientToInfo = new HashMap(); + protected Map> shardToClient = new HashMap>(); + protected Map> shardToJetty = new HashMap>(); + @BeforeClass public static void beforeClass() throws Exception { System.setProperty("CLOUD_UPDATE_DELAY", "0"); @@ -69,6 +82,10 @@ public class FullDistributedZkTest exten public FullDistributedZkTest() { fixShardCount = true; shardCount = 6; + + // TODO: for now, turn off stress because it uses regular clients, and we + // need the cloud client because we kill servers + stress = 0; } @Override @@ -83,8 +100,73 @@ public class FullDistributedZkTest exten if (sb.length() > 0) sb.append(','); JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml"); jettys.add(j); - clients.add(createNewSolrServer(j.getLocalPort())); - + SolrServer client = createNewSolrServer(j.getLocalPort()); + clients.add(client); + } + + for (SolrServer client : clients) { + // find info for this client in zk + ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000, + AbstractZkTestCase.TIMEOUT); + zk.createClusterStateWatchersAndUpdate(); + + Map slices = zk.getCloudState().getSlices( + DEFAULT_COLLECTION); + zk.updateCloudState(true); + + for (Map.Entry slice : slices.entrySet()) { + Map theShards = slice.getValue().getShards(); + for (Map.Entry shard : theShards.entrySet()) { + String shardName = new URI( + ((CommonsHttpSolrServer) client).getBaseURL()).getPort() + + "_solr_"; + // System.out.println("key:" + shard.getKey() + " try:" + shardName); + if (shard.getKey().endsWith(shardName)) { + System.out.println("shard:" + slice.getKey()); + System.out.println(shard.getValue()); + + clientToInfo.put(client, shard.getValue()); + List list = shardToClient.get(slice.getKey()); + if (list == null) { + list = new ArrayList(); + shardToClient.put(slice.getKey(), list); + } + list.add(client); + } + } + } + + } + + for (JettySolrRunner jetty : jettys) { + // find info for this client in zk + ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000, + AbstractZkTestCase.TIMEOUT); + zk.createClusterStateWatchersAndUpdate(); + + Map slices = zk.getCloudState().getSlices( + DEFAULT_COLLECTION); + zk.updateCloudState(true); + + for (Map.Entry slice : slices.entrySet()) { + Map theShards = slice.getValue().getShards(); + for (Map.Entry shard : theShards.entrySet()) { + String shardName = jetty.getLocalPort() + "_solr_"; + // System.out.println("key:" + shard.getKey() + " try:" + shardName); + if (shard.getKey().endsWith(shardName)) { +// System.out.println("shard:" + slice.getKey()); +// System.out.println(shard.getValue()); + + List list = shardToJetty.get(slice.getKey()); + if (list == null) { + list = new ArrayList(); + shardToJetty.put(slice.getKey(), list); + } + list.add(jetty); + } + } + } + } // build the shard string @@ -124,6 +206,7 @@ public class FullDistributedZkTest exten boolean pick = random.nextBoolean(); int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % sliceCount; + System.out.println("add doc to shard:" + which); if (pick) { which = which + ((shardCount / sliceCount) * random.nextInt(sliceCount-1)); @@ -138,6 +221,22 @@ public class FullDistributedZkTest exten ureq.process(client); } + protected void index_specific(int serverNumber, Object... fields) throws Exception { + SolrInputDocument doc = new SolrInputDocument(); + for (int i = 0; i < fields.length; i += 2) { + doc.addField((String) (fields[i]), fields[i + 1]); + } + controlClient.add(doc); + + CommonsHttpSolrServer client = (CommonsHttpSolrServer) clients.get(serverNumber); + + UpdateRequest ureq = new UpdateRequest(); + ureq.add(doc); + ureq.setParam("update.chain", "distrib-update-chain"); + System.out.println("set update.chain on req"); + ureq.process(client); + } + protected void del(String q) throws Exception { controlClient.deleteByQuery(q); for (SolrServer client : clients) { @@ -156,8 +255,14 @@ public class FullDistributedZkTest exten public void doTest() throws Exception { del("*:*"); + indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men" ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d); + + commit(); + + assertDocCounts(); + indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country." ); indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow" @@ -189,7 +294,9 @@ public class FullDistributedZkTest exten } commit(); - + + assertDocCounts(); + handle.clear(); handle.put("QTime", SKIPVAL); handle.put("timestamp", SKIPVAL); @@ -276,11 +383,12 @@ public class FullDistributedZkTest exten handle.remove("facet_fields"); - // index the same document to two servers and make sure things + // index the same document to two shards and make sure things // don't blow up. + // assumes first n clients are first n shards if (clients.size()>=2) { index(id,100, i1, 107 ,t1,"oh no, a duplicate!"); - for (int i=0; i clientToInfo = new HashMap(); + Map slices = zk.getCloudState().getSlices(DEFAULT_COLLECTION); + + zk.updateCloudState(true); + + long clientCount = 0; + for (SolrServer client : clients) { + for (Map.Entry slice : slices.entrySet()) { + Map theShards = slice.getValue().getShards(); + for (Map.Entry shard : theShards.entrySet()) { + String shardName = new URI(((CommonsHttpSolrServer)client).getBaseURL()).getPort() + "_solr_"; + // System.out.println("key:" + shard.getKey() + " try:" + shardName); + if (shard.getKey().endsWith(shardName)) { + System.out.println("shard:" + slice.getKey()); + System.out.println(shard.getValue()); + } + } + } + + long count = client.query(new SolrQuery("*:*")).getResults().getNumFound(); + + System.out.println("docs:" + count + "\n\n"); + clientCount += count; + } + assertEquals("Doc Counts do not add up", controlCount, clientCount / (shardCount / sliceCount)); + } + volatile CloudSolrServer solrj; @Override protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException { - if (r.nextBoolean()) - return super.queryServer(params); - // use the distributed solrj client if (solrj == null) { synchronized(this) { Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1201704&r1=1201703&r2=1201704&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original) +++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Mon Nov 14 13:50:57 2011 @@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutExcep import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.CloudState; import org.apache.solr.common.cloud.Slice; @@ -118,6 +119,11 @@ public class CloudSolrServer extends Sol CloudState cloudState = zkStateReader.getCloudState(); String collection = request.getParams().get("collection", defaultCollection); + + if (request instanceof UpdateRequest) { + // hack to kind of let updates work - should be fixed more completely + return updateRequest(cloudState, collection, request); + } // TODO: allow multiple collections to be specified via comma separated list @@ -151,6 +157,39 @@ public class CloudSolrServer extends Sol return rsp.getResponse(); } + private NamedList updateRequest(CloudState cloudState, + String collection, SolrRequest request) throws SolrServerException, IOException { + // TODO: prefer updating to the leader + + Map slices = cloudState.getSlices(collection); + Set liveNodes = cloudState.getLiveNodes(); + + // IDEA: have versions on various things... like a global cloudState version + // or shardAddressVersion (which only changes when the shards change) + // to allow caching. + + // build a map of unique nodes + // TODO: allow filtering by group, role, etc + Map nodes = new HashMap(); + List urlList = new ArrayList(); + for (Slice slice : slices.values()) { + for (ZkNodeProps nodeProps : slice.getShards().values()) { + String node = nodeProps.get(ZkStateReader.NODE_NAME); + if (!liveNodes.contains(node)) continue; + if (nodes.put(node, nodeProps) == null) { + String url = nodeProps.get(ZkStateReader.URL_PROP); + urlList.add(url); + } + } + } + + + // lets update to a server that is up... + CommonsHttpSolrServer server = new CommonsHttpSolrServer(urlList.get(0)); + NamedList rsp = server.request(request); + return rsp; + } + public void close() { if (zkStateReader != null) { synchronized(this) { Modified: lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java?rev=1201704&r1=1201703&r2=1201704&view=diff ============================================================================== --- lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java (original) +++ lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java Mon Nov 14 13:50:57 2011 @@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.resp import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.schema.TrieDateField; @@ -69,6 +70,7 @@ public abstract class BaseDistributedSea protected JettySolrRunner controlJetty; protected List clients = new ArrayList(); protected List jettys = new ArrayList(); + protected String context = "/solr"; protected String shards; protected String[] shardsArr; @@ -310,7 +312,14 @@ public abstract class BaseDistributedSea protected void commit() throws Exception { controlClient.commit(); - for (SolrServer client : clients) client.commit(); + for (SolrServer client : clients) { + try { + client.commit(); + } catch (SolrServerException e) { + // we might have killed a server on purpose in the test + log.warn("", e); + } + } } protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException { @@ -322,12 +331,13 @@ public abstract class BaseDistributedSea } protected void query(Object... q) throws Exception { + final ModifiableSolrParams params = new ModifiableSolrParams(); for (int i = 0; i < q.length; i += 2) { params.add(q[i].toString(), q[i + 1].toString()); } - + System.out.println("Q:" + params); final QueryResponse controlRsp = controlClient.query(params); setDistributedParams(params); @@ -377,6 +387,8 @@ public abstract class BaseDistributedSea } public static String compare(NamedList a, NamedList b, int flags, Map handle) { + System.out.println("resp a:" + a); + System.out.println("resp b:" + b); boolean ordered = (flags & UNORDERED) == 0; int posa = 0, posb = 0;