asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject asterixdb git commit: Fix Upsert to Never Enforce the First Operation
Date Thu, 03 Nov 2016 06:39:03 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master a9c74b6b4 -> d1f6121af


Fix Upsert to Never Enforce the First Operation

Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1336
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d1f6121a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d1f6121a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d1f6121a

Branch: refs/heads/master
Commit: d1f6121af04ee99ead01af839ec421050abd78f0
Parents: a9c74b6
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Wed Nov 2 19:36:08 2016 -0700
Committer: Till Westmann <tillw@apache.org>
Committed: Wed Nov 2 23:38:40 2016 -0700

----------------------------------------------------------------------
 ...rixLSMPrimaryUpsertOperatorNodePushable.java | 21 ++++++++++++++++----
 .../LockThenSearchOperationCallback.java        |  7 ++++++-
 2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d1f6121a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 96f9e76..536366f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -24,12 +24,14 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -80,6 +82,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
+    private LockThenSearchOperationCallback searchCallback;
 
     public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext
ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
int numOfPrimaryKeys,
@@ -140,8 +143,9 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
                     index, ctx, this);
-            indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
-                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
+            searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
+                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this);
+            indexAccessor = index.createAccessor(modCallback, searchCallback);
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
@@ -167,6 +171,12 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         }
         if (recordWasInserted || recordWasDeleted) {
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(),
0, tb.getSize());
+        } else {
+            try {
+                searchCallback.release();
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
@@ -185,6 +195,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
+        boolean firstModification = true;
         int i = 0;
         try {
             while (i < tupleCount) {
@@ -217,8 +228,9 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
                         tb.addFieldEndOffset();
                     }
                     modCallback.setOp(Operation.DELETE);
-                    if (i == 0) {
+                    if (firstModification) {
                         lsmAccessor.delete(prevTuple);
+                        firstModification = false;
                     } else {
                         lsmAccessor.forceDelete(prevTuple);
                     }
@@ -236,8 +248,9 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
                     modCallback.setOp(Operation.INSERT);
-                    if ((prevTuple == null) && (i == 0)) {
+                    if (firstModification) {
                         lsmAccessor.insert(tuple);
+                        firstModification = false;
                     } else {
                         lsmAccessor.forceInsert(tuple);
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d1f6121a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index ef3b218..288e7a5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -42,6 +42,7 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback
i
     private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable;
     private final ILogManager logManager;
     private final ILogRecord logRecord;
+    private int pkHash;
 
     public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem
txnSubsystem,
             ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
@@ -75,7 +76,7 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback
i
 
     @Override
     public void before(ITupleReference tuple) throws HyracksDataException {
-        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
             if (operatorNodePushable != null) {
 
@@ -122,4 +123,8 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback
i
     private void logWait() throws ACIDException {
         logManager.log(logRecord);
     }
+
+    public void release() throws ACIDException {
+        lockManager.unlock(datasetId, pkHash, LockMode.X, txnCtx);
+    }
 }


Mime
View raw message