phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [24/46] phoenix git commit: PHOENIX-3789 Addendum to execute cross region index maintenance calls in postBatchMutateIndispensably
Date Wed, 10 May 2017 18:04:23 GMT
PHOENIX-3789 Addendum to execute cross region index maintenance calls in postBatchMutateIndispensably


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ee886bab
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ee886bab
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ee886bab

Branch: refs/heads/omid
Commit: ee886bab91f58f4b54efc619d2ec3aec92d40a92
Parents: 8309b22
Author: James Taylor <jamestaylor@apache.org>
Authored: Mon Apr 17 14:53:41 2017 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Mon Apr 17 19:12:02 2017 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/hbase/index/Indexer.java | 39 ++++++++++++++++----
 1 file changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ee886bab/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index f485bdf..de98051 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -372,7 +372,7 @@ public class Indexer extends BaseRegionObserver {
       super.postPut(e, put, edit, durability);
           return;
         }
-    doPost(edit, put, durability, true);
+    doPost(edit, put, durability, true, false);
   }
 
   @Override
@@ -382,10 +382,29 @@ public class Indexer extends BaseRegionObserver {
       super.postDelete(e, delete, edit, durability);
           return;
         }
-    doPost(edit, delete, durability, true);
+    doPost(edit, delete, durability, true, false);
   }
 
   @Override
+  public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      if (this.disabled) {
+        super.postBatchMutate(c, miniBatchOp);
+        return;
+      }
+      WALEdit edit = miniBatchOp.getWalEdit(0);
+      if (edit != null) {
+        IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
+        if (ikv != null) {
+          // This will prevent the postPut and postDelete hooks from doing anything
+          // We need to do this now, as the postBatchMutateIndispensably (where the
+          // actual index writing gets done) is called after the postPut and postDelete.
+          ikv.markBatchFinished();
+        }
+      }
+  }
+  
+  @Override
   public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment>
c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws
IOException {
       if (this.disabled) {
@@ -398,13 +417,17 @@ public class Indexer extends BaseRegionObserver {
         //each batch operation, only the first one will have anything useful, so we can just
grab that
         Mutation mutation = miniBatchOp.getOperation(0);
         WALEdit edit = miniBatchOp.getWalEdit(0);
-        doPost(edit, mutation, mutation.getDurability(), false);
+        // We're forcing the index writes here because we've marked the index batch as "finished"
+        // to prevent postPut and postDelete from doing anything, but hold off on writing
them
+        // until now so we're outside of the MVCC lock (see PHOENIX-3789). Without this hacky
+        // forceWrite flag, we'd ignore them again here too.
+        doPost(edit, mutation, mutation.getDurability(), false, true);
     }
   }
 
-  private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates)
throws IOException {
+  private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates,
boolean forceWrite) throws IOException {
     try {
-      doPostWithExceptions(edit, m, durability, allowLocalUpdates);
+      doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite);
       return;
     } catch (Throwable e) {
       rethrowIndexingException(e);
@@ -413,7 +436,7 @@ public class Indexer extends BaseRegionObserver {
         "Somehow didn't complete the index update, but didn't return succesfully either!");
   }
 
-  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability,
boolean allowLocalUpdates)
+  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability,
boolean allowLocalUpdates, boolean forceWrite)
           throws Exception {
       //short circuit, if we don't need to do any work
       if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null)
{
@@ -447,14 +470,14 @@ public class Indexer extends BaseRegionObserver {
            * once (this hook gets called with the same WALEdit for each Put/Delete in a batch,
which can
            * lead to writing all the index updates for each Put/Delete).
            */
-          if (!ikv.getBatchFinished() || allowLocalUpdates) {
+          if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) {
               Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
 
               // the WAL edit is kept in memory and we already specified the factory when
we created the
               // references originally - therefore, we just pass in a null factory here and
use the ones
               // already specified on each reference
               try {
-            	  if (!ikv.getBatchFinished()) {
+            	  if (!ikv.getBatchFinished() || forceWrite) {
             		  current.addTimelineAnnotation("Actually doing index update for first time");
             		  writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates);
             	  } else if (allowLocalUpdates) {


Mime
View raw message