lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yo...@apache.org
Subject svn commit: r1208673 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/update/ java/org/apache/solr/update/processor/ test/org/apache/solr/search/
Date Wed, 30 Nov 2011 18:35:24 GMT
Author: yonik
Date: Wed Nov 30 18:35:23 2011
New Revision: 1208673

URL: http://svn.apache.org/viewvc?rev=1208673&view=rev
Log:
buffer deleteByQuery

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1208673&r1=1208672&r2=1208673&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
Wed Nov 30 18:35:23 2011
@@ -487,6 +487,7 @@ public class FSUpdateLog extends UpdateL
 
   @Override
   public Future<RecoveryInfo> recoverFromLog() {
+    recoveryInfo = new RecoveryInfo();
     if (tlogFiles.length == 0) return null;
     TransactionLog oldTlog = null;
 
@@ -530,6 +531,7 @@ public class FSUpdateLog extends UpdateL
   @Override
   public void bufferUpdates() {
     assert state == State.ACTIVE;
+    recoveryInfo = new RecoveryInfo();
 
     // block all updates to eliminate race conditions
     // reading state and acting on it in the update processor
@@ -671,6 +673,7 @@ public class FSUpdateLog extends UpdateL
             switch (oper) {
               case UpdateLog.ADD:
               {
+                recoveryInfo.adds++;
                 // byte[] idBytes = (byte[]) entry.get(2);
                 SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
                 AddUpdateCommand cmd = new AddUpdateCommand(req);
@@ -686,6 +689,7 @@ public class FSUpdateLog extends UpdateL
               }
               case UpdateLog.DELETE:
               {
+                recoveryInfo.deletes++;
                 byte[] idBytes = (byte[]) entry.get(2);
                 DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                 cmd.setIndexedId(new BytesRef(idBytes));
@@ -697,6 +701,7 @@ public class FSUpdateLog extends UpdateL
 
               case UpdateLog.DELETE_BY_QUERY:
               {
+                recoveryInfo.deleteByQuery++;
                 String query = (String)entry.get(2);
                 DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                 cmd.query = query;
@@ -717,12 +722,15 @@ public class FSUpdateLog extends UpdateL
                 throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation!
" + oper);
             }
           } catch (IOException ex) {
+            recoveryInfo.errors++;
             log.warn("IOException reading log", ex);
             // could be caused by an incomplete flush if recovering from log
           } catch (ClassCastException cl) {
-            log.warn("Unexpected log entry or corrupt log.  Entry=" + o);
+            recoveryInfo.errors++;
+            log.warn("Unexpected log entry or corrupt log.  Entry=" + o, cl);
             // would be caused by a corrupt transaction log
           } catch (Exception ex) {
+            recoveryInfo.errors++;
             log.warn("Exception replaying log", ex);
             // something wrong with the request?
           }
@@ -736,12 +744,14 @@ public class FSUpdateLog extends UpdateL
         try {
           uhandler.commit(cmd);
         } catch (IOException ex) {
+          recoveryInfo.errors++;
           log.error("Replay exception: final commit.", ex);
         }
 
         try {
           proc.finish();
         } catch (IOException ex) {
+          recoveryInfo.errors++;
           log.error("Replay exception: finish()", ex);
         }
 
@@ -749,6 +759,7 @@ public class FSUpdateLog extends UpdateL
         translog.decref();
 
       } catch (Exception e) {
+        recoveryInfo.errors++;
         SolrException.log(log,e);
       } finally {
         // change the state while updates are still blocked to prevent races

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1208673&r1=1208672&r2=1208673&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
Wed Nov 30 18:35:23 2011
@@ -314,6 +314,10 @@ public class TransactionLog {
 
   /* This method is thread safe */
   public Object lookup(long pos) {
+    // A negative position can result from a log replay (which does not re-log, but does
+    // update the version map.  This is OK since the node won't be ACTIVE when this happens.
+    if (pos < 0) return null;
+
     try {
       // make sure any unflushed buffer has been flushed
       synchronized (fos) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1208673&r1=1208672&r2=1208673&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
Wed Nov 30 18:35:23 2011
@@ -58,5 +58,9 @@ public abstract class UpdateLog implemen
 
 
   public static class RecoveryInfo {
+    public int adds;
+    public int deletes;
+    public int deleteByQuery;
+    public int errors;
   }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1208673&r1=1208672&r2=1208673&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Wed Nov 30 18:35:23 2011
@@ -372,14 +372,17 @@ public class DistributedUpdateProcessor 
   
   @Override
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
-    int hash = 0;
-    if (cmd.getIndexedId() == null) {
+    if (!cmd.isDeleteById()) {
       // delete by query...
       // TODO: handle versioned and distributed deleteByQuery
-      super.processDelete(cmd);
-    } else {
-      hash = hash(cmd);
+
+      // even in non zk mode, tests simulate updates from a leader
+      isLeader = !req.getParams().getBool(SEEN_LEADER, false);
+      processDeleteByQuery(cmd);
+      return;
     }
+
+    int hash = hash(cmd);
     if (zkEnabled) {
       shards = setupRequest(hash);
     } else {
@@ -496,7 +499,59 @@ public class DistributedUpdateProcessor 
       vinfo.unlockForUpdate();
     }
   }
-  
+
+  private void processDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+    if (vinfo == null) {
+      super.processDelete(cmd);
+      return;
+    }
+
+    // at this point, there is an update we need to try and apply.
+    // we may or may not be the leader.
+
+    // Find the version
+    long versionOnUpdate = cmd.getVersion();
+    if (versionOnUpdate == 0) {
+      String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
+      versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
+    }
+    versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
+
+    boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
+    boolean leaderLogic = isLeader && !isReplay;
+
+    if (!leaderLogic && versionOnUpdate==0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
+    }
+
+    vinfo.blockUpdates();
+    try {
+
+      if (versionsStored) {
+        if (leaderLogic) {
+          long version = vinfo.getNewClock();
+          cmd.setVersion(-version);
+          // TODO update versions in all buckets
+        } else {
+          cmd.setVersion(-versionOnUpdate);
+
+          if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() &
UpdateCommand.REPLAY) == 0) {
+            // we're not in an active state, and this update isn't from a replay, so buffer
it.
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.deleteByQuery(cmd);
+            return;
+          }
+        }
+      }
+
+      doLocalDelete(cmd);
+
+    } finally {
+      vinfo.unblockUpdates();
+    }
+
+  }
+
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1208673&r1=1208672&r2=1208673&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
Wed Nov 30 18:35:23 2011
@@ -229,6 +229,12 @@ public class TestRecovery extends SolrTe
       deleteAndGetVersion("4", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-94")); 
 // this update should not take affect
       updateJ(jsonAdd(sdoc("id","6", "_version_","106")), params(SEEN_LEADER,SEEN_LEADER_VAL));
       updateJ(jsonAdd(sdoc("id","5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","8", "_version_","108")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+      // test that delete by query is at least buffered along with everything else so it
will delete the
+      // currently buffered id:8 (even if it doesn't currently support versioning)
+      updateJ("{\"delete\": { \"query\":\"id:2 OR id:8\" }}", params(SEEN_LEADER,SEEN_LEADER_VAL,
"_version_","-300"));
+
 
       logReplay.drainPermits();
       rinfoFuture = ulog.applyBufferedUpdates();
@@ -247,18 +253,19 @@ public class TestRecovery extends SolrTe
       deleteAndGetVersion("6", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-206"));
 
       logReplay.release(1000);
-      if (rinfoFuture != null) rinfoFuture.get();
+      UpdateLog.RecoveryInfo recInfo = rinfoFuture.get();
 
       assertJQ(req("q", "*:*", "sort","id asc", "fl","id,_version_")
           , "/response/docs==["
-                           +  "{'id':'2','_version_':102}"
-                           + ",{'id':'3','_version_':103}"
+                           + "{'id':'3','_version_':103}"
                            + ",{'id':'4','_version_':104}"
                            + ",{'id':'5','_version_':105}"
                            + ",{'id':'7','_version_':107}"
                            +"]"
       );
 
+      assertEquals(1, recInfo.deleteByQuery);
+
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in
a good state
     } finally {
       FSUpdateLog.testing_logReplayHook = null;



Mime
View raw message