lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uschind...@apache.org
Subject svn commit: r1235969 [2/9] - in /lucene/dev/branches/lucene2858: ./ dev-tools/eclipse/ dev-tools/idea/lucene/contrib/ dev-tools/maven/ lucene/ lucene/contrib/ lucene/contrib/sandbox/src/test/org/apache/lucene/sandbox/queries/regex/ lucene/src/java/org/...
Date Wed, 25 Jan 2012 21:56:51 GMT
Modified: lucene/dev/branches/lucene2858/modules/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/modules/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/modules/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java (original)
+++ lucene/dev/branches/lucene2858/modules/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java Wed Jan 25 21:56:44 2012
@@ -199,9 +199,6 @@ public abstract class PerfTask implement
     return new String(c);
   }
   
-  /* (non-Javadoc)
-   * @see java.lang.Object#toString()
-   */
   @Override
   public String toString() {
     String padd = getPadding();
@@ -248,22 +245,23 @@ public abstract class PerfTask implement
   }
 
   /**
-   * Task setup work that should not be measured for that specific task.
-   * By default it does nothing, but tasks can implement this, moving work from 
-   * doLogic() to this method. Only the work done in doLogicis measured for this task.
-   * Notice that higher level (sequence) tasks containing this task would then 
-   * measure larger time than the sum of their contained tasks.
-   * @throws Exception 
+   * Task setup work that should not be measured for that specific task. By
+   * default it does nothing, but tasks can implement this, moving work from
+   * {@link #doLogic()} to this method. Only the work done in {@link #doLogic()}
+   * is measured for this task. Notice that higher level (sequence) tasks
+   * containing this task would then measure larger time than the sum of their
+   * contained tasks.
    */
   public void setup () throws Exception {
   }
-  
+
   /**
-   * Task tearDown work that should not be measured for that specific task.
-   * By default it does nothing, but tasks can implement this, moving work from 
-   * doLogic() to this method. Only the work done in doLogicis measured for this task.
-   * Notice that higher level (sequence) tasks containing this task would then 
-   * measure larger time than the sum of their contained tasks.
+   * Task tearDown work that should not be measured for that specific task. By
+   * default it does nothing, but tasks can implement this, moving work from
+   * {@link #doLogic()} to this method. Only the work done in {@link #doLogic()}
+   * is measured for this task. Notice that higher level (sequence) tasks
+   * containing this task would then measure larger time than the sum of their
+   * contained tasks.
    */
   public void tearDown() throws Exception {
     if (++logStepCount % logStep == 0) {
@@ -274,16 +272,20 @@ public abstract class PerfTask implement
   }
 
   /**
-   * Sub classes that supports parameters must override this method to return true.
+   * Sub classes that support parameters must override this method to return
+   * true.
+   * 
    * @return true iff this task supports command line params.
    */
   public boolean supportsParams () {
     return false;
   }
-  
+
   /**
    * Set the params of this task.
-   * @exception UnsupportedOperationException for tasks supporting command line parameters.
+   * 
+   * @exception UnsupportedOperationException
+   *              for tasks supporting command line parameters.
    */
   public void setParams(String params) {
     if (!supportsParams()) {

Modified: lucene/dev/branches/lucene2858/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/CHANGES.txt?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/CHANGES.txt (original)
+++ lucene/dev/branches/lucene2858/solr/CHANGES.txt Wed Jan 25 21:56:44 2012
@@ -24,11 +24,11 @@ $Id$
 ==================  4.0.0-dev ==================
 Versions of Major Components
 ---------------------
-Apache Tika 0.10
+Apache Tika 1.0
 Carrot2 3.5.0
 Velocity 1.6.4 and Velocity Tools 2.0
 Apache UIMA 2.3.1
-Apache ZooKeeper 3.3.3
+Apache ZooKeeper 3.3.4
 
 
 Upgrading from Solr 3.6-dev
@@ -539,6 +539,8 @@ Bug Fixes
   HyphenatedWordsFilter where they would create invalid offsets in
   some situations, leading to problems in highlighting.  (Robert Muir)
 
+* SOLR-2280: commitWithin ignored for a delete query (Juan Grande via janhoy)
+
 Other Changes
 ----------------------
 * SOLR-2922: Upgrade commons-io and commons-lang to 2.1 and 2.6, respectively. (koji)
@@ -554,6 +556,8 @@ Other Changes
 * SOLR-2718: Add ability to lazy load response writers, defined with startup="lazy".
   (ehatcher)
 
+* SOLR-2901: Upgrade Solr to Tika 1.0 (janhoy)
+
 Build
 ----------------------
 * SOLR-2487: Add build target to package war without slf4j jars (janhoy)

Modified: lucene/dev/branches/lucene2858/solr/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/build.xml?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/build.xml (original)
+++ lucene/dev/branches/lucene2858/solr/build.xml Wed Jan 25 21:56:44 2012
@@ -482,7 +482,7 @@
           <packageset dir="contrib/langid/src/java"/>
           <packageset dir="contrib/uima/src/java"/>
           <group title="Core" packages="org.apache.*" />
-          <group title="SolrJ" packages="org.apache.solr.common.*,org.apache.solr.client.solrj*" />
+          <group title="SolrJ" packages="org.apache.solr.common.*,org.apache.solr.client.solrj.*,org.apache.zookeeper.*" />
           <group title="contrib: Clustering" packages="org.apache.solr.handler.clustering*" />
           <group title="contrib: DataImportHandler" packages="org.apache.solr.handler.dataimport*" />
           <group title="contrib: Solr Cell" packages="org.apache.solr.handler.extraction*" />

Modified: lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java Wed Jan 25 21:56:44 2012
@@ -18,8 +18,8 @@ package org.apache.solr.handler.dataimpo
 
 import com.sun.mail.imap.IMAPMessage;
 
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.utils.ParseUtils;
+import org.apache.tika.Tika;
+import org.apache.tika.metadata.Metadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +95,8 @@ public class MailEntityProcessor extends
               getStringFromContext("processAttachment",null) == null ? "processAttachement":"processAttachment"
             , true);
 
+    tika = new Tika();
+    
     logConfig();
   }
 
@@ -166,7 +168,10 @@ public class MailEntityProcessor extends
       if (!processAttachment || (disp != null && disp.equalsIgnoreCase(Part.ATTACHMENT)))        return;
       InputStream is = part.getInputStream();
       String fileName = part.getFileName();
-      String content = ParseUtils.getStringContent(is, TikaConfig.getDefaultConfig(), ctype.getBaseType().toLowerCase(Locale.ENGLISH));
+      Metadata md = new Metadata();
+      md.set(Metadata.CONTENT_TYPE, ctype.getBaseType().toLowerCase(Locale.ENGLISH));
+      md.set(Metadata.RESOURCE_NAME_KEY, fileName);
+      String content = tika.parseToString(is, md);
       if (disp != null && disp.equalsIgnoreCase(Part.ATTACHMENT)) {
         if (row.get(ATTACHMENT) == null)
           row.put(ATTACHMENT, new ArrayList<String>());
@@ -529,6 +534,8 @@ public class MailEntityProcessor extends
 
   private boolean processAttachment = true;
 
+  private Tika tika;
+  
   // holds the current state
   private Store mailbox;
   private boolean connected = false;

Modified: lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/TikaEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/TikaEntityProcessor.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/TikaEntityProcessor.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler-extras/src/java/org/apache/solr/handler/dataimport/TikaEntityProcessor.java Wed Jan 25 21:56:44 2012
@@ -118,9 +118,7 @@ public class TikaEntityProcessor extends
     }
     Parser tikaParser = null;
     if(parser.equals(AUTO_PARSER)){
-      AutoDetectParser parser = new AutoDetectParser();
-      parser.setConfig(tikaConfig);
-      tikaParser = parser;
+      tikaParser = new AutoDetectParser(tikaConfig);
     } else {
       tikaParser = (Parser) context.getSolrCore().getResourceLoader().newInstance(parser);
     }

Modified: lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java Wed Jan 25 21:56:44 2012
@@ -81,7 +81,7 @@ public class SolrWriter extends DIHWrite
     try {
       log.info("Deleting document: " + id);
       DeleteUpdateCommand delCmd = new DeleteUpdateCommand(req);
-      delCmd.id = id.toString();
+      delCmd.setId(id.toString());
       processor.processDelete(delCmd);
     } catch (IOException e) {
       log.error("Exception while deleteing: " + id, e);

Modified: lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java Wed Jan 25 21:56:44 2012
@@ -173,9 +173,8 @@ public class TestContentStreamDataSource
   }
 
   private JettySolrRunner createJetty(SolrInstance instance) throws Exception {
-    System.setProperty("solr.solr.home", instance.getHomeDir());
     System.setProperty("solr.data.dir", instance.getDataDir());
-    JettySolrRunner jetty = new JettySolrRunner("/solr", 0);
+    JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
     jetty.start();
     return jetty;
   }

Modified: lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java Wed Jan 25 21:56:44 2012
@@ -47,7 +47,7 @@ public class TestSolrEntityProcessorEndT
   
   private static Logger LOG = LoggerFactory.getLogger(TestSolrEntityProcessorEndToEnd.class);
   
-  private static final String SOLR_SOURCE_URL = "http://localhost:8983/solr";
+  //rivate static final String SOLR_SOURCE_URL = "http://localhost:8983/solr";
   private static final String SOLR_CONFIG = "dataimport-solrconfig.xml";
   private static final String SOLR_SCHEMA = "dataimport-schema.xml";
   private static final String SOLR_HOME = "dih/solr";
@@ -68,29 +68,36 @@ public class TestSolrEntityProcessorEndT
     solrDoc.put("desc", "SolrDescription");
     SOLR_DOCS.add(solrDoc);
   }
-  
-  private static final String DIH_CONFIG_TAGS_INNER_ENTITY = "<dataConfig>\r\n"
-      + "  <dataSource type='MockDataSource' />\r\n"
-      + "  <document>\r\n"
-      + "    <entity name='db' query='select * from x'>\r\n"
-      + "      <field column='dbid_s' />\r\n"
-      + "      <field column='dbdesc_s' />\r\n"
-      + "      <entity name='se' processor='SolrEntityProcessor' query='id:${db.dbid_s}'\n"
-      + "     url='" + SOLR_SOURCE_URL + "' fields='id,desc'>\r\n"
-      + "        <field column='id' />\r\n"
-      + "        <field column='desc' />\r\n" + "      </entity>\r\n"
-      + "    </entity>\r\n" + "  </document>\r\n" + "</dataConfig>\r\n";
+
   
   private SolrInstance instance = null;
   private JettySolrRunner jetty;
   
-  private static String generateDIHConfig(String options) {
+  private static String getDihConfigTagsInnerEntity(int port) {
+    return  "<dataConfig>\r\n"
+        + "  <dataSource type='MockDataSource' />\r\n"
+        + "  <document>\r\n"
+        + "    <entity name='db' query='select * from x'>\r\n"
+        + "      <field column='dbid_s' />\r\n"
+        + "      <field column='dbdesc_s' />\r\n"
+        + "      <entity name='se' processor='SolrEntityProcessor' query='id:${db.dbid_s}'\n"
+        + "     url='" + getSourceUrl(port) + "' fields='id,desc'>\r\n"
+        + "        <field column='id' />\r\n"
+        + "        <field column='desc' />\r\n" + "      </entity>\r\n"
+        + "    </entity>\r\n" + "  </document>\r\n" + "</dataConfig>\r\n";
+  }
+  
+  private static String generateDIHConfig(String options, int port) {
     return "<dataConfig>\r\n" + "  <document>\r\n"
         + "    <entity name='se' processor='SolrEntityProcessor'" + "   url='"
-        + SOLR_SOURCE_URL + "' " + options + " />\r\n" + "  </document>\r\n"
+        + getSourceUrl(port) + "' " + options + " />\r\n" + "  </document>\r\n"
         + "</dataConfig>\r\n";
   }
   
+  private static String getSourceUrl(int port) {
+    return "http://localhost:" + port + "/solr";
+  }
+  
   //TODO: fix this test to close its directories
   static String savedFactory;
   @BeforeClass
@@ -107,7 +114,7 @@ public class TestSolrEntityProcessorEndT
       System.setProperty("solr.directoryFactory", savedFactory);
     }
   }
-  
+
   @Override
   @Before
   public void setUp() throws Exception {
@@ -138,7 +145,7 @@ public class TestSolrEntityProcessorEndT
     
     try {
       addDocumentsToSolr(SOLR_DOCS);
-      runFullImport(generateDIHConfig("query='*:*' rows='2' fields='id,desc' onError='skip'"));
+      runFullImport(generateDIHConfig("query='*:*' rows='2' fields='id,desc' onError='skip'", jetty.getLocalPort()));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -156,7 +163,7 @@ public class TestSolrEntityProcessorEndT
       addDocumentsToSolr(generateSolrDocuments(30));
       Map<String,String> map = new HashMap<String,String>();
       map.put("rows", "50");
-      runFullImport(generateDIHConfig("query='*:*' fq='desc:Description1*,desc:Description*2' rows='2'"), map);
+      runFullImport(generateDIHConfig("query='*:*' fq='desc:Description1*,desc:Description*2' rows='2'", jetty.getLocalPort()), map);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -171,7 +178,7 @@ public class TestSolrEntityProcessorEndT
     
     try {
       addDocumentsToSolr(generateSolrDocuments(7));
-      runFullImport(generateDIHConfig("query='*:*' fields='id' rows='2'"));
+      runFullImport(generateDIHConfig("query='*:*' fields='id' rows='2'", jetty.getLocalPort()));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -197,7 +204,7 @@ public class TestSolrEntityProcessorEndT
     try {
       MockDataSource.setIterator("select * from x", DB_DOCS.iterator());
       addDocumentsToSolr(SOLR_DOCS);
-      runFullImport(DIH_CONFIG_TAGS_INNER_ENTITY);
+      runFullImport(getDihConfigTagsInnerEntity(jetty.getLocalPort()));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -224,7 +231,7 @@ public class TestSolrEntityProcessorEndT
     assertQ(req("*:*"), "//result[@numFound='0']");
     
     try {
-      runFullImport(generateDIHConfig("query='*:*' rows='2' fields='id,desc' onError='skip'"));
+      runFullImport(generateDIHConfig("query='*:*' rows='2' fields='id,desc' onError='skip'", jetty.getLocalPort()));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -237,7 +244,7 @@ public class TestSolrEntityProcessorEndT
     assertQ(req("*:*"), "//result[@numFound='0']");
     
     try {
-      runFullImport(generateDIHConfig("query='bogus:3' rows='2' fields='id,desc' onError='abort'"));
+      runFullImport(generateDIHConfig("query='bogus:3' rows='2' fields='id,desc' onError='abort'", jetty.getLocalPort()));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -255,8 +262,7 @@ public class TestSolrEntityProcessorEndT
       addDocumentsToSolr(docList);
       Map<String,String> map = new HashMap<String,String>();
       map.put("rows", "50");
-      runFullImport(generateDIHConfig("query='*:*' rows='6' numThreads='4'"),
-          map);
+      runFullImport(generateDIHConfig("query='*:*' rows='6' numThreads='4'", jetty.getLocalPort()), map);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -287,7 +293,7 @@ public class TestSolrEntityProcessorEndT
     }
     
     HttpClient client = new HttpClient(new MultiThreadedHttpConnectionManager());
-    URL url = new URL(SOLR_SOURCE_URL);
+    URL url = new URL(getSourceUrl(jetty.getLocalPort()));
     CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(url, client);
     solrServer.add(sidl);
     solrServer.commit(true, true);
@@ -343,9 +349,8 @@ public class TestSolrEntityProcessorEndT
   }
   
   private JettySolrRunner createJetty(SolrInstance instance) throws Exception {
-    System.setProperty("solr.solr.home", instance.getHomeDir());
     System.setProperty("solr.data.dir", instance.getDataDir());
-    JettySolrRunner jetty = new JettySolrRunner("/solr", 8983);
+    JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
     jetty.start();
     return jetty;
   }

Modified: lucene/dev/branches/lucene2858/solr/contrib/extraction/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/extraction/CHANGES.txt?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/extraction/CHANGES.txt (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/extraction/CHANGES.txt Wed Jan 25 21:56:44 2012
@@ -20,7 +20,7 @@ to your Solr Home lib directory.  See ht
 Tika Dependency
 ---------------
 
-Current Version: Tika 0.10 (released 2011-09-30)
+Current Version: Tika 1.0 (released 2011-11-07)
 
 $Id$
 
@@ -34,6 +34,8 @@ $Id$
   This is convenient when Tika's auto detector cannot detect encoding, especially
   the text file is too short to detect encoding. (koji)
 
+* SOLR-2901: Upgrade Solr to Tika 1.0 (janhoy)
+
 ================== Release 3.5.0 ==================
 
 * SOLR-2372: Upgrade Solr to Tika 0.10 (janhoy)

Modified: lucene/dev/branches/lucene2858/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java Wed Jan 25 21:56:44 2012
@@ -39,6 +39,7 @@ import org.apache.tika.exception.TikaExc
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.DefaultParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.sax.XHTMLContentHandler;
@@ -138,7 +139,7 @@ public class ExtractingDocumentLoader ex
     if (streamType != null) {
       //Cache?  Parsers are lightweight to construct and thread-safe, so I'm told
       MediaType mt = MediaType.parse(streamType.trim().toLowerCase(Locale.ENGLISH));
-      parser = config.getParser(mt);
+      parser = new DefaultParser(config.getMediaTypeRegistry()).getParsers().get(mt);
     } else {
       parser = autoDetectParser;
     }
@@ -151,6 +152,10 @@ public class ExtractingDocumentLoader ex
       if (resourceName != null) {
         metadata.add(Metadata.RESOURCE_NAME_KEY, resourceName);
       }
+      // Provide stream's content type as hint for auto detection
+      if(stream.getContentType() != null) {
+        metadata.add(Metadata.CONTENT_TYPE, stream.getContentType());
+      }
 
       InputStream inputStream = null;
       try {

Modified: lucene/dev/branches/lucene2858/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java Wed Jan 25 21:56:44 2012
@@ -18,7 +18,6 @@ package org.apache.solr.handler.extracti
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.ContentStream;
@@ -419,7 +418,33 @@ public class ExtractingRequestHandlerTes
     assertU(commit());
     assertQ(req("*:*"), "//result[@numFound=1]");
   }
+  
+  @Test
+  public void testWrongStreamType() throws Exception {
+    ExtractingRequestHandler handler = (ExtractingRequestHandler) h.getCore().getRequestHandler("/update/extract");
+    assertTrue("handler is null and it shouldn't be", handler != null);
+
+    try{
+      // Load plain text specifying another mime type, should fail
+      loadLocal("extraction/version_control.txt", 
+              "literal.id", "one",
+              ExtractingParams.STREAM_TYPE, "application/pdf"
+      );
+      fail("SolrException is expected because wrong parser specified for the file type");
+    }
+    catch(Exception expected){}
 
+    try{
+      // Load plain text specifying non existing mimetype, should fail
+      loadLocal("extraction/version_control.txt", 
+              "literal.id", "one",
+              ExtractingParams.STREAM_TYPE, "foo/bar"
+      );
+      fail("SolrException is expected because nonexsisting parser specified");
+    }
+    catch(Exception expected){}
+  }
+  
   SolrQueryResponse loadLocal(String filename, String... args) throws Exception {
     LocalSolrQueryRequest req = (LocalSolrQueryRequest) req(args);
     try {

Modified: lucene/dev/branches/lucene2858/solr/contrib/langid/src/test/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessorFactoryTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/contrib/langid/src/test/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessorFactoryTestCase.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/contrib/langid/src/test/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessorFactoryTestCase.java (original)
+++ lucene/dev/branches/lucene2858/solr/contrib/langid/src/test/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessorFactoryTestCase.java Wed Jan 25 21:56:44 2012
@@ -67,7 +67,7 @@ public abstract class LanguageIdentifier
     assertLang("no", "id", "1no", "name", "Lucene", "subject", "Lucene er et fri/åpen kildekode programvarebibliotek for informasjonsgjenfinning, opprinnelig utviklet i programmeringsspråket Java av Doug Cutting. Lucene støttes av Apache Software Foundation og utgis under Apache-lisensen.");
     assertLang("en", "id", "2en", "name", "Lucene", "subject", "Apache Lucene is a free/open source information retrieval software library, originally created in Java by Doug Cutting. It is supported by the Apache Software Foundation and is released under the Apache Software License.");
     assertLang("sv", "id", "3sv", "name", "Maven", "subject", "Apache Maven är ett verktyg utvecklat av Apache Software Foundation och används inom systemutveckling av datorprogram i programspråket Java. Maven används för att automatiskt paketera (bygga) programfilerna till en distribuerbar enhet. Maven används inom samma område som Apache Ant men dess byggfiler är deklarativa till skillnad ifrån Ants skriptbaserade.");
-    assertLang("es", "id", "4es", "name", "Lucene", "subject", "Lucene es un API de código abierto para recuperación de información, originalmente implementada en Java por Doug Cutting. Está apoyado por el Apache Software Foundation y se distribuye bajo la Apache Software License. Lucene tiene versiones para otros lenguajes incluyendo Delphi, Perl, C#, C++, Python, Ruby y PHP.");
+    assertLang("es", "id", "4es", "name", "Español", "subject", "El español, como las otras lenguas romances, es una continuación moderna del latín hablado (denominado latín vulgar), desde el siglo III, que tras el desmembramiento del Imperio romano fue divergiendo de las otras variantes del latín que se hablaban en las distintas provincias del antiguo Imperio, dando lugar mediante una lenta evolución a las distintas lenguas romances. Debido a su propagación por América, el español es, con diferencia, la lengua romance que ha logrado mayor difusión.");
     assertLang("un", "id", "5un", "name", "a", "subject", "b");
     assertLang("th", "id", "6th", "name", "บทความคัดสรรเดือนนี้", "subject", "อันเนอลีส มารี อันเนอ ฟรังค์ หรือมักรู้จักในภาษาไทยว่า แอนน์ แฟรงค์ เป็นเด็กหญิงชาวยิว เกิดที่เมืองแฟรงก์เฟิร์ต ประเทศเยอรมนี เธอมีชื่อเสียงโด่à
 ‡à¸”ังในฐานะผู้เขียนบันทึกประจำวันซึ่งต่อมาได้รับการตีพิมพ์เป็นหนังสือ บรรยายเหตุการณ์ขณะหลบซ่อนตัวจากการล่าชาวยิวในประเทศเนเธอร์แลนด์ ระหว่างที่ถูกเยอรมนีเข้าครอบครองใน
 ช่วงสงครามโลกครั้งที่สอง");
     assertLang("ru", "id", "7ru", "name", "Lucene", "subject", "The Apache Lucene — это свободная библиотека для высокоскоростного полнотекстового поиска, написанная на Java. Может быть использована для поиска в интернете и других областях компьютерной лингвистики (аналитическая философия).");
@@ -76,7 +76,17 @@ public abstract class LanguageIdentifier
     assertLang("nl", "id", "10nl", "name", "Lucene", "subject", "Lucene is een gratis open source, tekst gebaseerde information retrieval API van origine geschreven in Java door Doug Cutting. Het wordt ondersteund door de Apache Software Foundation en is vrijgegeven onder de Apache Software Licentie. Lucene is ook beschikbaar in andere programeertalen zoals Perl, C#, C++, Python, Ruby en PHP.");
     assertLang("it", "id", "11it", "name", "Lucene", "subject", "Lucene è una API gratuita ed open source per il reperimento di informazioni inizialmente implementata in Java da Doug Cutting. È supportata dall'Apache Software Foundation ed è resa disponibile con l'Apache License. Lucene è stata successivamente reimplementata in Perl, C#, C++, Python, Ruby e PHP.");
     assertLang("pt", "id", "12pt", "name", "Lucene", "subject", "Apache Lucene, ou simplesmente Lucene, é um software de busca e uma API de indexação de documentos, escrito na linguagem de programação Java. É um software de código aberto da Apache Software Foundation licenciado através da licença Apache.");
+    // New in Tika1.0
+    assertLang("ca", "id", "13ca", "name", "Catalan", "subject", "El català posseeix dos estàndards principals: el regulat per l'Institut d'Estudis Catalans, o estàndard general, que pren com a base l'ortografia establerta per Pompeu Fabra amb els trets gramaticals i ortogràfics característics del català central; i el regulat per l'Acadèmia Valenciana de la Llengua, estàndard d'àmbit restringit, centrat en l'estandardització del valencià i que pren com a base les Normes de Castelló, és a dir, l'ortografia de Pompeu Fabra però més adaptada a la pronúncia del català occidental i als trets que caracteritzen els dialectes valencians.");
+    assertLang("be", "id", "14be", "name", "Belarusian", "subject", "Наступнай буйной дзяржавай на беларускай зямлі было Вялікае княства Літоўскае, Рускае і Жамойцкае (ВКЛ). Падчас стварэння і пачатковага развіцця гэтай дзяржавы найбуйнейшым і асноўным яе цэнтрам быў Новагародак. Акрамя сучасных земляў Беларусі, у склад гэтай дÐ
 яржавы ўваходзілі таксама землі сучаснай Літвы, паўночная частка сучаснай Украіны і частка сучаснай Расіі.");
+    assertLang("eo", "id", "15eo", "name", "Esperanto", "subject", "La vortprovizo de Esperanto devenas plejparte el la okcidenteŭropaj lingvoj, dum ĝia sintakso kaj morfologio montras ankaŭ slavlingvan influon. La morfemoj ne ŝanĝiĝas kaj oni povas ilin preskaŭ senlime kombini, kreante diverssignifajn vortojn, Esperanto do havas multajn kunaĵojn kun la analizaj lingvoj, al kiuj apartenas ekzemple la ĉina; kontraŭe la interna strukturo de Esperanto certagrade respegulas la aglutinajn lingvojn, kiel la japanan, svahilan aŭ turkan.");
+    assertLang("gl", "id", "16gl", "name", "Galician", "subject", "A cifra de falantes medrou axiña durante as décadas seguintes, nun principio no Imperio ruso e na Europa oriental, logo na Europa occidental, América, China e no Xapón. Nos primeiros anos do movemento, os esperantistas mantiñan contacto por correspondencia, pero en 1905 o primeiro Congreso Universal de Esperanto levouse a cabo na cidade francesa de Boulogne-sur-Mer. Dende entón, os congresos mundiais organizáronse nos cinco continentes ano tras ano agás durante as dúas Guerras Mundiais.");
+    assertLang("ro", "id", "17ro", "name", "Romanian", "subject", "La momentul destrămării Uniunii Sovietice și a înlăturării regimului comunist instalat în România (1989), țara a inițiat o serie de reforme economice și politice. După un deceniu de probleme economice, România a introdus noi reforme economice de ordin general (precum cota unică de impozitare, în 2005) și a aderat la Uniunea Europeană la 1 ianuarie 2007.");
+    assertLang("sk", "id", "18sk", "name", "Slovakian", "subject", "Boli vytvorené dva národné parlamenty - Česká národná rada a Slovenská národná rada a spoločný jednokomorový česko-slovenský parlament bol premenovaný z Národného zhromaždenia na Federálne zhromaždenie s dvoma komorami - Snemovňou ľudu a Snemovňu národov.");
+    assertLang("sl", "id", "19sl", "name", "Slovenian", "subject", "Slovenska Wikipedija je različica spletne enciklopedije Wikipedije v slovenskem jeziku. Projekt slovenske Wikipedije se je začel 26. februarja 2002 z ustanovitvijo njene spletne strani, njen pobudnik pa je bil uporabnik Jani Melik.");
+    assertLang("uk", "id", "20uk", "name", "Ukrainian", "subject", "Народно-господарський комплекс країни включає такі види промисловості як важке машинобудування, чорна та кольорова металургія, суднобудування, виробництво автобусів, легкових та вантажних автомобілів, тракторів та іншої сільськогосподарської техніки, тепл
 овозів, верстатів, турбін, авіаційних двигунів та літаків, обладнання для електростанцій, нафто-газової та хімічної промисловості тощо. Крім того, Україна є потужним виробником електроенергії. Україна має розвинуте сільське господарство і займає одне з провідних місць серед експÐ
 ¾Ñ€Ñ‚ерів деяких видів сільськогосподарської продукції і продовольства (зокрема, соняшникової олії).");
   }
+    
   
   @Test
   public void testMapFieldName() throws Exception {

Modified: lucene/dev/branches/lucene2858/solr/core/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/build.xml?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/build.xml (original)
+++ lucene/dev/branches/lucene2858/solr/core/build.xml Wed Jan 25 21:56:44 2012
@@ -27,6 +27,6 @@
                                  jar.file="${common-solr.dir}/lib/commons-csv-1.0-SNAPSHOT-r966014.jar" />
 
     <m2-deploy-with-pom-template pom.xml="${common-solr.dir}/lib/apache-solr-noggit-pom.xml.template"
-                                 jar.file="${common-solr.dir}/lib/apache-solr-noggit-r1209632.jar" />
+                                 jar.file="${common-solr.dir}/lib/apache-solr-noggit-r1211150.jar" />
   </target>
 </project>

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/analysis/TokenizerChain.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/analysis/TokenizerChain.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/analysis/TokenizerChain.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/analysis/TokenizerChain.java Wed Jan 25 21:56:44 2012
@@ -19,6 +19,7 @@ package org.apache.solr.analysis;
 
 import org.apache.lucene.analysis.*;
 
+import java.io.IOException;
 import java.io.Reader;
 
 /**
@@ -48,6 +49,21 @@ public final class TokenizerChain extend
   public TokenizerFactory getTokenizerFactory() { return tokenizer; }
   public TokenFilterFactory[] getTokenFilterFactories() { return filters; }
 
+  class SolrTokenStreamComponents extends TokenStreamComponents {
+    public SolrTokenStreamComponents(final Tokenizer source, final TokenStream result) {
+      super(source, result);
+    }
+
+    @Override
+    protected void reset(Reader reader) throws IOException {
+      // the tokenizers are currently reset by the indexing process, so only
+      // the tokenizer needs to be reset.
+      Reader r = initReader(reader);
+      super.reset(r);
+    }
+  }
+  
+  
   @Override
   public Reader initReader(Reader reader) {
     if (charFilters != null && charFilters.length > 0) {
@@ -62,12 +78,12 @@ public final class TokenizerChain extend
 
   @Override
   protected TokenStreamComponents createComponents(String fieldName, Reader aReader) {
-    Tokenizer tk = tokenizer.create(aReader);
+    Tokenizer tk = tokenizer.create( initReader(aReader) );
     TokenStream ts = tk;
     for (TokenFilterFactory filter : filters) {
       ts = filter.create(ts);
     }
-    return new TokenStreamComponents(tk, ts);
+    return new SolrTokenStreamComponents(tk, ts);
   }
 
   @Override

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Wed Jan 25 21:56:44 2012
@@ -34,6 +34,7 @@ import org.mortbay.jetty.servlet.Context
 import org.mortbay.jetty.servlet.FilterHolder;
 import org.mortbay.jetty.servlet.HashSessionIdManager;
 import org.mortbay.log.Logger;
+import org.mortbay.thread.QueuedThreadPool;
 
 /**
  * Run solr using jetty
@@ -48,30 +49,76 @@ public class JettySolrRunner {
   String context;
 
   private String solrConfigFilename;
+  private String schemaFilename;
 
   private boolean waitOnSolr = false;
 
-  public JettySolrRunner(String context, int port) {
-    this.init(context, port);
+  private int lastPort = -1;
+
+  private String shards;
+
+  private String dataDir;
+  
+  private volatile boolean startedBefore = false;
+
+  private String solrHome;
+
+  private boolean stopAtShutdown;
+
+  public JettySolrRunner(String solrHome, String context, int port) {
+    this.init(solrHome, context, port, true);
   }
 
-  public JettySolrRunner(String context, int port, String solrConfigFilename) {
-    this.init(context, port);
+  public JettySolrRunner(String solrHome, String context, int port, String solrConfigFilename, String schemaFileName) {
+    this.init(solrHome, context, port, true);
+    this.solrConfigFilename = solrConfigFilename;
+    this.schemaFilename = schemaFileName;
+  }
+  
+  public JettySolrRunner(String solrHome, String context, int port,
+      String solrConfigFilename, String schemaFileName, boolean stopAtShutdown) {
+    this.init(solrHome, context, port, stopAtShutdown);
     this.solrConfigFilename = solrConfigFilename;
+    this.schemaFilename = schemaFileName;
   }
 
-  private void init(String context, int port) {
+  private void init(String solrHome, String context, int port, boolean stopAtShutdown) {
     this.context = context;
     server = new Server(port);
-    server.setStopAtShutdown(true);
+    this.solrHome = solrHome;
+    this.stopAtShutdown = stopAtShutdown;
+    server.setStopAtShutdown(stopAtShutdown);
+    if (!stopAtShutdown) {
+      server.setGracefulShutdown(0);
+    }
+    System.setProperty("solr.solr.home", solrHome);
     if (System.getProperty("jetty.testMode") != null) {
       // SelectChannelConnector connector = new SelectChannelConnector();
       // Normal SocketConnector is what solr's example server uses by default
       SocketConnector connector = new SocketConnector();
       connector.setPort(port);
       connector.setReuseAddress(true);
-      server.setConnectors(new Connector[] { connector });
+      if (!stopAtShutdown) {
+        QueuedThreadPool threadPool = (QueuedThreadPool) connector
+            .getThreadPool();
+        if (threadPool != null) {
+          threadPool.setMaxStopTimeMs(100);
+        }
+      }
+      server.setConnectors(new Connector[] {connector});
       server.setSessionIdManager(new HashSessionIdManager(new Random()));
+    } else {
+      if (!stopAtShutdown) {
+        for (Connector connector : server.getConnectors()) {
+          if (connector instanceof SocketConnector) {
+            QueuedThreadPool threadPool = (QueuedThreadPool) ((SocketConnector) connector)
+                .getThreadPool();
+            if (threadPool != null) {
+              threadPool.setMaxStopTimeMs(100);
+            }
+          }
+        }
+      }
     }
 
     // Initialize the servlets
@@ -92,13 +139,20 @@ public class JettySolrRunner {
       }
 
       public void lifeCycleStarted(LifeCycle arg0) {
-        System.setProperty("hostPort", Integer.toString(getLocalPort()));
-        if (solrConfigFilename != null)
-          System.setProperty("solrconfig", solrConfigFilename);
+        lastPort = getFirstConnectorPort();
+        System.setProperty("hostPort", Integer.toString(lastPort));
+        if (solrConfigFilename != null) System.setProperty("solrconfig",
+            solrConfigFilename);
+        if (schemaFilename != null) System.setProperty("schema", 
+            schemaFilename);
+//        SolrDispatchFilter filter = new SolrDispatchFilter();
+//        FilterHolder fh = new FilterHolder(filter);
         dispatchFilter = root.addFilter(SolrDispatchFilter.class, "*",
             Handler.REQUEST);
-        if (solrConfigFilename != null)
-          System.clearProperty("solrconfig");
+        if (solrConfigFilename != null) System.clearProperty("solrconfig");
+        if (schemaFilename != null) System.clearProperty("schema");
+        System.clearProperty("solr.solr.home");
+        
       }
 
       public void lifeCycleFailure(LifeCycle arg0, Throwable arg1) {
@@ -111,6 +165,18 @@ public class JettySolrRunner {
 
   }
 
+  public FilterHolder getDispatchFilter() {
+    return dispatchFilter;
+  }
+
+  public boolean isRunning() {
+    return server.isRunning();
+  }
+  
+  public boolean isStopped() {
+    return server.isStopped();
+  }
+
   // ------------------------------------------------------------------------------------------------
   // ------------------------------------------------------------------------------------------------
 
@@ -119,6 +185,21 @@ public class JettySolrRunner {
   }
 
   public void start(boolean waitForSolr) throws Exception {
+    // if started before, make a new server
+    if (startedBefore) {
+      waitOnSolr = false;
+      init(solrHome, context, lastPort, stopAtShutdown);
+    } else {
+      startedBefore = true;
+    }
+    
+    if( dataDir != null) {
+      System.setProperty("solr.data.dir", dataDir);
+    }
+    if(shards != null) {
+      System.setProperty("shard", shards);
+    }
+    
     if (!server.isRunning()) {
       server.start();
     }
@@ -131,27 +212,42 @@ public class JettySolrRunner {
         }
       }
     }
+    
+    System.clearProperty("shard");
+    System.clearProperty("solr.data.dir");
   }
 
   public void stop() throws Exception {
-    if (server.isRunning()) {
+    if (!server.isStopped() && !server.isStopping()) {
       server.stop();
-      server.join();
     }
+    server.join();
   }
 
   /**
-   * Returns the Local Port of the first Connector found for the jetty Server.
+   * Returns the Local Port of the jetty Server.
    * 
    * @exception RuntimeException if there is no Connector
    */
-  public int getLocalPort() {
+  private int getFirstConnectorPort() {
     Connector[] conns = server.getConnectors();
     if (0 == conns.length) {
       throw new RuntimeException("Jetty Server has no Connectors");
     }
     return conns[0].getLocalPort();
   }
+  
+  /**
+   * Returns the Local Port of the jetty Server.
+   * 
+   * @exception RuntimeException if there is no Connector
+   */
+  public int getLocalPort() {
+    if (lastPort == -1) {
+      throw new IllegalStateException("You cannot get the port until this instance has started");
+    }
+    return lastPort;
+  }
 
   // --------------------------------------------------------------
   // --------------------------------------------------------------
@@ -172,12 +268,20 @@ public class JettySolrRunner {
    */
   public static void main(String[] args) {
     try {
-      JettySolrRunner jetty = new JettySolrRunner("/solr", 8983);
+      JettySolrRunner jetty = new JettySolrRunner(".", "/solr", 8983);
       jetty.start();
     } catch (Exception ex) {
       ex.printStackTrace();
     }
   }
+
+  public void setShards(String shardList) {
+     this.shards = shardList;
+  }
+
+  public void setDataDir(String dataDir) {
+    this.dataDir = dataDir;
+  }
 }
 
 class NoLog implements Logger {

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Wed Jan 25 21:56:44 2012
@@ -1,8 +1,5 @@
 package org.apache.solr.cloud;
 
-import org.apache.solr.common.params.SolrParams;
-
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,11 +17,14 @@ import org.apache.solr.common.params.Sol
  * limitations under the License.
  */
 
+import org.apache.solr.common.params.SolrParams;
+
 public class CloudDescriptor {
   private String shardId;
   private String collectionName;
   private SolrParams params;
-
+  private String roles = "";
+  
   public void setShardId(String shardId) {
     this.shardId = shardId;
   }
@@ -41,6 +41,14 @@ public class CloudDescriptor {
     this.collectionName = collectionName;
   }
 
+  public String getRoles(){
+	  return roles;
+  }
+  
+  public void setRoles(String roles){
+	  this.roles = roles;
+  }
+  
   /** Optional parameters that can change how a core is created. */
   public SolrParams getParams() {
     return params;

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Jan 25 21:56:44 2012
@@ -20,24 +20,36 @@ package org.apache.solr.cloud;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.UpdateLog;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +73,7 @@ public final class ZkController {
   private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
   private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
 
-
+  
   // package private for tests
 
   static final String CONFIGS_ZKNODE = "/configs";
@@ -69,10 +81,14 @@ public final class ZkController {
   public final static String COLLECTION_PARAM_PREFIX="collection.";
   public final static String CONFIGNAME_PROP="configName";
 
-  private SolrZkClient zkClient;
+  private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
   
+  private SolrZkClient zkClient;
+  private ZkCmdExecutor cmdExecutor;
   private ZkStateReader zkStateReader;
 
+  private LeaderElector leaderElector;
+  
   private String zkServerAddress;
 
   private String localHostPort;
@@ -82,20 +98,61 @@ public final class ZkController {
 
   private String hostName;
 
+  private LeaderElector overseerElector;
+  
+  private boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
+
+  // this can be null in which case recovery will be inactive
+  private CoreContainer cc;
+
+  public static void main(String[] args) throws Exception {
+    // start up a tmp zk server first
+    String zkServerAddress = args[0];
+    
+    String solrHome = args[1];
+    String solrPort = args[2];
+    
+    String confDir = args[3];
+    String confName = args[4];
+    
+    SolrZkServer zkServer = new SolrZkServer("true", null, solrHome, solrPort);
+    zkServer.parseConfig();
+    zkServer.start();
+    
+    SolrZkClient zkClient = new SolrZkClient(zkServerAddress, 15000, 5000,
+        new OnReconnect() {
+          @Override
+          public void command() {
+          }});
+    
+    uploadConfigDir(zkClient, new File(confDir), confName);
+    
+    zkServer.stop();
+  }
+
+
   /**
-   * @param zkServerAddress ZooKeeper server host address
+   * @param coreContainer if null, recovery will not be enabled
+   * @param zkServerAddress
    * @param zkClientTimeout
    * @param zkClientConnectTimeout
    * @param localHost
    * @param locaHostPort
    * @param localHostContext
+   * @param numShards 
    * @throws InterruptedException
    * @throws TimeoutException
    * @throws IOException
    */
-  public ZkController(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-      String localHostContext) throws InterruptedException,
+  public ZkController(CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+      String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
+    this.cc = cc;
+    if (localHostContext.contains("/")) {
+      throw new IllegalArgumentException("localHostContext ("
+          + localHostContext + ") should not contain a /");
+    }
+    
     this.zkServerAddress = zkServerAddress;
     this.localHostPort = locaHostPort;
     this.localHostContext = localHostContext;
@@ -107,68 +164,61 @@ public final class ZkController {
 
           public void command() {
             try {
-              zkStateReader.makeCollectionsNodeWatches();
-              zkStateReader.makeShardsWatches(true);
+              // we need to create all of our lost watches
+              
+              // seems we dont need to do this again...
+              //Overseer.createClientNodes(zkClient, getNodeName());
+
+              ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+              overseerElector.joinElection(context);
+              zkStateReader.createClusterStateWatchersAndUpdate();
+              
+              List<CoreDescriptor> descriptors = registerOnReconnect
+                  .getCurrentDescriptors();
+              if (descriptors != null) {
+                // before registering as live, make sure everyone is in a
+                // recovery state
+                for (CoreDescriptor descriptor : descriptors) {
+                  final String shardZkNodeName = getNodeName() + "_"
+                      + descriptor.getName();
+                  publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
+                      descriptor.getName());
+                }
+              }
+              
+              // we have to register as live first to pick up docs in the buffer
               createEphemeralLiveNode();
-              zkStateReader.updateCloudState(false);
-            } catch (KeeperException e) {
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
+              
+              // re register all descriptors
+              if (descriptors != null) {
+                for (CoreDescriptor descriptor : descriptors) {
+                  // TODO: we need to think carefully about what happens when it was
+                  // a leader that was expired - as well as what to do about leaders/overseers
+                  // with connection loss
+                  register(descriptor.getName(), descriptor, true);
+                }
+              }
+  
             } catch (InterruptedException e) {
               // Restore the interrupted status
               Thread.currentThread().interrupt();
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
-            } catch (IOException e) {
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (Exception e) {
+              SolrException.log(log, "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
             }
 
           }
         });
+    cmdExecutor = new ZkCmdExecutor();
+    leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
     init();
   }
 
   /**
-   * @param shardId
-   * @param collection
-   * @throws IOException
-   * @throws InterruptedException 
-   * @throws KeeperException 
-   */
-  private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
-
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-    
-    try {
-      
-      // shards node
-      if (!zkClient.exists(shardsZkPath)) {
-        if (log.isInfoEnabled()) {
-          log.info("creating zk shards node:" + shardsZkPath);
-        }
-        // makes shards zkNode if it doesn't exist
-        zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
-        
-        // TODO: consider how these notifications are being done
-        // ping that there is a new shardId
-        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-
-      }
-    } catch (KeeperException e) {
-      // its okay if another beats us creating the node
-      if (e.code() != KeeperException.Code.NODEEXISTS) {
-        throw e;
-      }
-    }
-
-  }
-
-  /**
    * Closes the underlying ZooKeeper client.
    */
   public void close() {
@@ -177,7 +227,7 @@ public final class ZkController {
     } catch (InterruptedException e) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
-      log.error("", e);
+      log.warn("", e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
           "", e);
     }
@@ -192,7 +242,7 @@ public final class ZkController {
    */
   public boolean configFileExists(String collection, String fileName)
       throws KeeperException, InterruptedException {
-    Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
+    Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null, true);
     return stat != null;
   }
 
@@ -213,7 +263,7 @@ public final class ZkController {
   public byte[] getConfigFileData(String zkConfigName, String fileName)
       throws KeeperException, InterruptedException {
     String zkPath = CONFIGS_ZKNODE + "/" + zkConfigName + "/" + fileName;
-    byte[] bytes = zkClient.getData(zkPath, null, null);
+    byte[] bytes = zkClient.getData(zkPath, null, null, true);
     if (bytes == null) {
       log.error("Config file contains no data:" + zkPath);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -271,20 +321,17 @@ public final class ZkController {
       }
       
       // makes nodes zkNode
-      try {
-        zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
-      } catch (KeeperException e) {
-        // its okay if another beats us creating the node
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-      }
+      cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
       
+      Overseer.createClientNodes(zkClient, getNodeName());
       createEphemeralLiveNode();
-      setUpCollectionsNode();
-      zkStateReader.makeCollectionsNodeWatches();
+      cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+
+      overseerElector = new LeaderElector(zkClient);
+      ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+      overseerElector.setup(context);
+      overseerElector.joinElection(context);
+      zkStateReader.createClusterStateWatchersAndUpdate();
       
     } catch (IOException e) {
       log.error("", e);
@@ -303,53 +350,17 @@ public final class ZkController {
     }
 
   }
+  
+  public boolean isConnected() {
+    return zkClient.isConnected();
+  }
 
   private void createEphemeralLiveNode() throws KeeperException,
       InterruptedException {
     String nodeName = getNodeName();
     String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
-    Watcher liveNodeWatcher = new Watcher() {
-
-      public void process(WatchedEvent event) {
-        try {
-          log.info("Updating live nodes:" + zkClient);
-          try {
-            zkStateReader.updateLiveNodes();
-          } finally {
-            // re-make watch
-
-            String path = event.getPath();
-            if(path == null) {
-              // on shutdown, it appears this can trigger with a null path
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            zkClient.getChildren(event.getPath(), this);
-          }
-        } catch (KeeperException e) {
-          if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-            return;
-          }
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (IOException e) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-        
-      }
-      
-    };
+   
     try {
       boolean nodeDeleted = true;
       try {
@@ -358,7 +369,7 @@ public final class ZkController {
         // until expiration timeout - so a node won't be created here because
         // it exists, but eventually the node will be removed. So delete
         // in case it exists and create a new node.
-        zkClient.delete(nodePath, -1);
+        zkClient.delete(nodePath, -1, true);
       } catch (KeeperException.NoNodeException e) {
         // fine if there is nothing to delete
         // TODO: annoying that ZK logs a warning on us
@@ -369,25 +380,17 @@ public final class ZkController {
             .info("Found a previous node that still exists while trying to register a new live node "
                 + nodePath + " - removing existing node to create another.");
       }
-      zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+      zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
     } catch (KeeperException e) {
       // its okay if the node already exists
       if (e.code() != KeeperException.Code.NODEEXISTS) {
         throw e;
       }
-    }
-    zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
-    try {
-      zkStateReader.updateLiveNodes();
-    } catch (IOException e) {
-      log.error("", e);
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    }
+    }    
   }
   
   public String getNodeName() {
-    return hostName + ":" + localHostPort + "_"+ localHostContext;
+    return hostName + ":" + localHostPort + "_" + localHostContext;
   }
 
   /**
@@ -398,7 +401,7 @@ public final class ZkController {
    */
   public boolean pathExists(String path) throws KeeperException,
       InterruptedException {
-    return zkClient.exists(path);
+    return zkClient.exists(path, true);
   }
 
   /**
@@ -417,15 +420,14 @@ public final class ZkController {
     if (log.isInfoEnabled()) {
       log.info("Load collection config from:" + path);
     }
-    byte[] data = zkClient.getData(path, null, null);
-    ZkNodeProps props = new ZkNodeProps();
+    byte[] data = zkClient.getData(path, null, null, true);
     
     if(data != null) {
-      props.load(data);
+      ZkNodeProps props = ZkNodeProps.load(data);
       configName = props.get(CONFIGNAME_PROP);
     }
     
-    if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName)) {
+    if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
       log.error("Specified config does not exist in ZooKeeper:" + configName);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
           "Specified config does not exist in ZooKeeper:" + configName);
@@ -434,67 +436,224 @@ public final class ZkController {
     return configName;
   }
 
+
   /**
    * Register shard with ZooKeeper.
    * 
    * @param coreName
    * @param cloudDesc
-   * @param forcePropsUpdate update solr.xml core props even if the shard is already registered
-   * @throws IOException
-   * @throws KeeperException
-   * @throws InterruptedException
+   * @return
+   * @throws Exception 
+   */
+  public String register(String coreName, final CoreDescriptor desc) throws Exception {  
+    return register(coreName, desc, false);
+  }
+  
+
+  /**
+   * Register shard with ZooKeeper.
+   * 
+   * @param coreName
+   * @param desc
+   * @param recoverReloadedCores
+   * @return
+   * @throws Exception
    */
-  public void register(String coreName, CloudDescriptor cloudDesc, boolean forcePropsUpdate) throws IOException,
-      KeeperException, InterruptedException {
-    String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
-        + "/" + coreName;
+  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {  
+    final String baseUrl = getBaseUrl();
     
-    String collection = cloudDesc.getCollectionName();
+    final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+    final String collection = cloudDesc.getCollectionName();
     
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
 
-    boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
-    
-    if(shardZkNodeAlreadyExists && !forcePropsUpdate) {
-      return;
-    }
+    log.info("Attempting to update " + ZkStateReader.CLUSTER_STATE + " version "
+        + null);
+    CloudState state = CloudState.load(zkClient, zkStateReader.getCloudState().getLiveNodes());
+
+    final String coreZkNodeName = getNodeName() + "_" + coreName;
     
+    String shardId = cloudDesc.getShardId();
+
+    Map<String,String> props = new HashMap<String,String>();
+    props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    props.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
+    props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+
     if (log.isInfoEnabled()) {
-      log.info("Register shard - core:" + coreName + " address:"
-          + shardUrl);
+        log.info("Register shard - core:" + coreName + " address:"
+            + baseUrl + " shardId:" + shardId);
     }
 
-    ZkNodeProps props = new ZkNodeProps();
-    props.put(ZkStateReader.URL_PROP, shardUrl);
+    // we only put a subset of props into the leader node
+    ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+        props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
+        props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
+        props.get(ZkStateReader.NODE_NAME_PROP));
     
-    props.put(ZkStateReader.NODE_NAME, getNodeName());
 
-    byte[] bytes = props.store();
+    joinElection(collection, coreZkNodeName, shardId, leaderProps);
+    
+    String leaderUrl = zkStateReader.getLeaderUrl(collection,
+        cloudDesc.getShardId(), 30000);
+    
+    String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+    log.info("We are " + ourUrl + " and leader is " + leaderUrl);
+    boolean isLeader = leaderUrl.equals(ourUrl);
     
-    String shardZkNodeName = getNodeName() + "_" + coreName;
 
-    if(shardZkNodeAlreadyExists && forcePropsUpdate) {
-      zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
-      // tell everyone to update cloud info
-      zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-    } else {
-      addZkShardsNode(cloudDesc.getShardId(), collection);
+    SolrCore core = null;
+    if (cc != null) { // CoreContainer only null in tests
       try {
-        zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
-            CreateMode.PERSISTENT);
-        // tell everyone to update cloud info
-        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-      } catch (KeeperException e) {
-        // its okay if the node already exists
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          throw e;
+        core = cc.getCore(desc.getName());
+
+        boolean startRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+            collection, coreZkNodeName, shardId, leaderProps, core, cc);
+        if (!startRecovery) {
+          publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+        }
+      } finally {
+        if (core != null) {
+          core.close();
+        }
+      }
+    } else {
+      publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+    }
+    
+    // make sure we have an update cluster state right away
+    zkStateReader.updateCloudState(true);
+
+    return shardId;
+  }
+
+
+  private void joinElection(final String collection,
+      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException, KeeperException, IOException {
+    ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
+        collection, shardZkNodeName, leaderProps, this, cc);
+    
+    leaderElector.setup(context);
+    leaderElector.joinElection(context);
+  }
+
+
+  private boolean checkRecovery(String coreName, final CoreDescriptor desc,
+      boolean recoverReloadedCores, final boolean isLeader,
+      final CloudDescriptor cloudDesc, final String collection,
+      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
+      SolrCore core, CoreContainer cc) throws InterruptedException,
+      KeeperException, IOException, ExecutionException {
+
+    
+    boolean doRecovery = true;
+
+
+    if (isLeader) {
+      doRecovery = false;
+      
+      // recover from local transaction log and wait for it to complete before
+      // going active
+      // TODO: should this be moved to another thread? To recoveryStrat?
+      // TODO: should this actually be done earlier, before (or as part of)
+      // leader election perhaps?
+      // TODO: ensure that a replica that is trying to recover waits until I'm
+      // active (or don't make me the
+      // leader until my local replay is done. But this replay is only needed
+      // on the leader - replicas
+      // will do recovery anyway
+      
+      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+      if (!core.isReloaded() && ulog != null) {
+        Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+            .getUpdateLog().recoverFromLog();
+        if (recoveryFuture != null) {
+          recoveryFuture.get(); // NOTE: this could potentially block for
+                                // minutes or more!
+          // TODO: public as recovering in the mean time?
         }
-        // for some reason the shard already exists, though it didn't when we
-        // started registration - just return
-        return;
       }
+      return false;
+    } else {
+      
+      if (core.isReloaded() && !recoverReloadedCores) {
+        doRecovery = false;
+      }
+    }
+    
+    if (doRecovery && !SKIP_AUTO_RECOVERY) {
+      log.info("Core needs to recover:" + core.getName());
+      core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+      return true;
     }
+    
+    return false;
+  }
+
+
+  public String getBaseUrl() {
+    final String baseUrl = localHostName + ":" + localHostPort + "/" + localHostContext;
+    return baseUrl;
+  }
+
 
+  void publishAsActive(String shardUrl,
+      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, shardUrl);
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
+
+    publishState(cd, shardZkNodeName, coreName, finalProps);
+  }
+
+  public void publish(SolrCore core, String state) {
+    CoreDescriptor cd = core.getCoreDescriptor();
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, state);
+    publishState(cd, getNodeName() + "_" + core.getName(),
+        core.getName(), finalProps);
+  }
+  
+  void publishAsDown(String baseUrl,
+      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+ 
+    publishState(cd, shardZkNodeName, coreName, finalProps);
+  }
+  
+  void publishAsRecoveryFailed(String baseUrl,
+      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
+    publishState(cd, shardZkNodeName, coreName, finalProps);
+  }
+
+
+  private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
+      final CloudState state, final String shardZkNodeName) {
+
+    final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+    
+    final String shardId = state.getShardId(shardZkNodeName);
+
+    if (shardId != null) {
+      cloudDesc.setShardId(shardId);
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -513,16 +672,7 @@ public final class ZkController {
    * @throws InterruptedException
    */
   public void uploadToZK(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
-    File[] files = dir.listFiles();
-    for(File file : files) {
-      if (!file.getName().startsWith(".")) {
-        if (!file.isDirectory()) {
-          zkClient.setData(zkPath + "/" + file.getName(), file);
-        } else {
-          uploadToZK(file, zkPath + "/" + file.getName());
-        }
-      }
-    }
+    uploadToZK(zkClient, dir, zkPath);
   }
   
   /**
@@ -533,7 +683,7 @@ public final class ZkController {
    * @throws InterruptedException
    */
   public void uploadConfigDir(File dir, String configName) throws IOException, KeeperException, InterruptedException {
-    uploadToZK(dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+    uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
   }
 
   // convenience for testing
@@ -541,32 +691,6 @@ public final class ZkController {
     zkClient.printLayoutToStdOut();
   }
 
-  private void setUpCollectionsNode() throws KeeperException, InterruptedException {
-    try {
-      if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
-        if (log.isInfoEnabled()) {
-          log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
-        }
-        // makes collections zkNode if it doesn't exist
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
-      }
-    } catch (KeeperException e) {
-      // its okay if another beats us creating the node
-      if (e.code() != KeeperException.Code.NODEEXISTS) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      }
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      log.error("", e);
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    }
-    
-  }
-
   public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
     String collection = cd.getCollectionName();
     
@@ -574,12 +698,12 @@ public final class ZkController {
     String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
     
     try {
-      if(!zkClient.exists(collectionPath)) {
+      if(!zkClient.exists(collectionPath, true)) {
         log.info("Creating collection in ZooKeeper:" + collection);
        SolrParams params = cd.getParams();
 
         try {
-          ZkNodeProps collectionProps = new ZkNodeProps();
+          Map<String,String> collectionProps = new HashMap<String,String>();
           // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
           String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, "configuration1");
 
@@ -595,7 +719,7 @@ public final class ZkController {
 
             // if the config name wasn't passed in, use the default
             if (!collectionProps.containsKey(CONFIGNAME_PROP))
-              collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
+              getConfName(collection, collectionPath, collectionProps);
             
           } else if(System.getProperty("bootstrap_confdir") != null) {
             // if we are bootstrapping a collection, default the config for
@@ -614,32 +738,14 @@ public final class ZkController {
               collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
 
           } else {
-            // check for configName
-            log.info("Looking for collection configName");
-            int retry = 1;
-            for (; retry < 6; retry++) {
-              if (zkClient.exists(collectionPath)) {
-                collectionProps = new ZkNodeProps();
-                collectionProps.load(zkClient.getData(collectionPath, null, null));
-                if (collectionProps.containsKey(CONFIGNAME_PROP)) {
-                  break;
-                }
-              }
-              log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
-              Thread.sleep(2000);
-            }
-            if (retry == 6) {
-              log.error("Could not find configName for collection " + collection);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR,
-                  "Could not find configName for collection " + collection);
-            }
+            getConfName(collection, collectionPath, collectionProps);
           }
           
-          zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
+          ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
+          zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
          
           // ping that there is a new collection
-          zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+          zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null, true);
         } catch (KeeperException e) {
           // its okay if the node already exists
           if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -658,9 +764,131 @@ public final class ZkController {
     }
     
   }
+
+
+  private void getConfName(String collection, String collectionPath,
+      Map<String,String> collectionProps) throws KeeperException,
+      InterruptedException {
+    // check for configName
+    log.info("Looking for collection configName");
+    int retry = 1;
+    for (; retry < 6; retry++) {
+      if (zkClient.exists(collectionPath, true)) {
+        ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null, true));
+        if (cProps.containsKey(CONFIGNAME_PROP)) {
+          break;
+        }
+      }
+      // if there is only one conf, use that
+      List<String> configNames = zkClient.getChildren(CONFIGS_ZKNODE, null, true);
+      if (configNames.size() == 1) {
+        // no config set named, but there is only 1 - use it
+        log.info("Only one config set found in zk - using it:" + configNames.get(0));
+        collectionProps.put(CONFIGNAME_PROP,  configNames.get(0));
+        break;
+      }
+      log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
+      Thread.sleep(2000);
+    }
+    if (retry == 6) {
+      log.error("Could not find configName for collection " + collection);
+      throw new ZooKeeperException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Could not find configName for collection " + collection);
+    }
+  }
   
   public ZkStateReader getZkStateReader() {
     return zkStateReader;
   }
 
+  
+  private void publishState(CoreDescriptor cd, String shardZkNodeName, String coreName,
+      Map<String,String> props) {
+    CloudDescriptor cloudDesc = cd.getCloudDescriptor();
+    
+    if (cloudDesc.getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
+      // publish with no shard id so we are assigned one, and then look for it
+      doPublish(shardZkNodeName, coreName, props, cloudDesc);
+      String shardId;
+      try {
+        shardId = doGetShardIdProcess(coreName, cloudDesc);
+      } catch (InterruptedException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+      }
+      cloudDesc.setShardId(shardId);
+    }
+   
+    
+    if (!props.containsKey(ZkStateReader.SHARD_ID_PROP) && cloudDesc.getShardId() != null) {
+      props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
+    }
+    
+    doPublish(shardZkNodeName, coreName, props, cloudDesc);
+  }
+
+
+  private void doPublish(String shardZkNodeName, String coreName,
+      Map<String,String> props, CloudDescriptor cloudDesc) {
+
+    CoreState coreState = new CoreState(coreName,
+        cloudDesc.getCollectionName(), props);
+    coreStates.put(shardZkNodeName, coreState);
+    final String nodePath = "/node_states/" + getNodeName();
+
+    try {
+      zkClient.setData(nodePath, ZkStateReader.toJSON(coreStates.values()),
+          true);
+      
+    } catch (KeeperException e) {
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "could not publish node state", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "could not publish node state", e);
+    }
+  }
+
+  private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor)
+      throws InterruptedException {
+    final String shardZkNodeName = getNodeName() + "_" + coreName;
+    int retryCount = 120;
+    while (retryCount-- > 0) {
+      final String shardId = zkStateReader.getCloudState().getShardId(
+          shardZkNodeName);
+      if (shardId != null) {
+        return shardId;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR,
+        "Could not get shard_id for core: " + coreName);
+  }
+  
+  public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+    File[] files = dir.listFiles();
+    if (files == null) {
+      throw new IllegalArgumentException("Illegal directory: " + dir);
+    }
+    for(File file : files) {
+      if (!file.getName().startsWith(".")) {
+        if (!file.isDirectory()) {
+          zkClient.makePath(zkPath + "/" + file.getName(), file, false, true);
+        } else {
+          uploadToZK(zkClient, file, zkPath + "/" + file.getName());
+        }
+      }
+    }
+  }
+  
+  public static void uploadConfigDir(SolrZkClient zkClient, File dir, String configName) throws IOException, KeeperException, InterruptedException {
+    uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+  }
+
 }



Mime
View raw message