asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kiss...@apache.org
Subject [6/6] incubator-asterixdb git commit: Deadlock-free locking protocol is enabled
Date Fri, 06 May 2016 16:23:15 GMT
Deadlock-free locking protocol is enabled

- Added EntityCommitProfiler class in TransactionSubsystem.java file:
This profiler takes a report interval (in seconds) parameter and
reports entity level commit count every report interval (in seconds)
only if IS_PROFILE_MODE is set to true. The profiler runs in a separate
thread. However, the profiler thread doesn't start reporting the count
until the entityCommitCount > 0. The profiler can be used to measure
1) IPS (Inserts Per Second) and
2) IIPS (instantaneous IPS) for the every report interval.

Change-Id: Ie58ae2f519baa53599e99b51bd61ea5f8366dafd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/825
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/23be9068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/23be9068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/23be9068

Branch: refs/heads/master
Commit: 23be90686bc60b6455bdb7d1ccf3084e14c6cd7d
Parents: 7c15c13
Author: kisskys <kisskys@apache.org>
Authored: Thu May 5 23:27:28 2016 -0700
Committer: Young-Seok Kim <kisskys@gmail.com>
Committed: Fri May 6 09:22:17 2016 -0700

----------------------------------------------------------------------
 .../asterix/optimizer/base/RuleCollections.java |    2 -
 .../IntroduceInstantLockSearchCallbackRule.java |  156 --
 .../config/AsterixTransactionProperties.java    |    8 +
 ...erixLSMInsertDeleteOperatorNodePushable.java |   50 +-
 .../AbstractOperationCallbackFactory.java       |    1 -
 .../common/transactions/ILockManager.java       |    2 -
 .../asterix/common/transactions/ILogRecord.java |    1 +
 .../asterix/common/transactions/LogRecord.java  |   28 +-
 .../asterix/common/transactions/LogType.java    |    4 +
 .../indexing/ExternalFileIndexAccessor.java     |    2 +-
 ...ExternalBTreeSearchOperatorNodePushable.java |    2 +-
 ...ExternalRTreeSearchOperatorNodePushable.java |    2 +-
 .../apache/asterix/metadata/MetadataNode.java   |   25 +-
 .../asterix/metadata/api/IMetadataNode.java     |   75 +-
 .../metadata/declared/AqlMetadataProvider.java  |   19 +-
 ...rixLSMPrimaryUpsertOperatorNodePushable.java |   14 +-
 .../LockThenSearchOperationCallback.java        |   72 +-
 .../LockThenSearchOperationCallbackFactory.java |    9 +-
 ...exInstantSearchOperationCallbackFactory.java |    3 +-
 ...imaryIndexModificationOperationCallback.java |   52 +-
 ...dexModificationOperationCallbackFactory.java |    6 +-
 ...maryIndexSearchOperationCallbackFactory.java |    3 +-
 ...dexModificationOperationCallbackFactory.java |    4 +-
 ...daryIndexSearchOperationCallbackFactory.java |    3 +-
 ...dexModificationOperationCallbackFactory.java |    4 +-
 ...dexModificationOperationCallbackFactory.java |    4 +-
 .../UpsertOperationCallbackFactory.java         |    4 +-
 .../service/locking/ConcurrentLockManager.java  |  204 +-
 .../service/locking/DatasetLockInfo.java        |  536 ----
 .../service/locking/DeadlockDetector.java       |  255 --
 .../service/locking/DummyLockManager.java       |   93 -
 .../service/locking/DumpTablePrinter.java       |   15 +-
 .../service/locking/EntityInfoManager.java      |  730 ------
 .../service/locking/EntityLockInfoManager.java  |  827 -------
 .../service/locking/ILockHashTable.java         |   39 -
 .../management/service/locking/ILockMatrix.java |   48 -
 .../management/service/locking/JobInfo.java     |  334 ---
 .../management/service/locking/LockManager.java | 2289 ------------------
 .../LockManagerDeterministicUnitTest.java       |  664 -----
 .../locking/LockManagerRandomUnitTest.java      |  636 -----
 .../service/locking/LockRequestTracker.java     |   75 -
 .../management/service/locking/LockWaiter.java  |  149 --
 .../service/locking/LockWaiterManager.java      |  403 ---
 .../service/locking/TimeOutDetector.java        |  101 -
 .../management/service/logging/LogBuffer.java   |   41 +-
 .../management/service/logging/LogManager.java  |    4 +-
 .../logging/LogManagerWithReplication.java      |   18 +-
 .../service/recovery/RecoveryManager.java       |    6 +-
 .../TransactionManagementConstants.java         |   45 +-
 .../service/transaction/TransactionManager.java |    2 +-
 .../transaction/TransactionSubsystem.java       |   76 +-
 .../service/locking/LockManagerUnitTest.java    |  124 +-
 .../management/service/locking/Locker.java      |   39 +-
 .../management/service/locking/Request.java     |   15 +-
 .../storage/am/btree/test/FramewriterTest.java  |    2 +-
 .../IModificationOperationCallbackFactory.java  |    4 +-
 .../api/ISearchOperationCallbackFactory.java    |    3 +-
 ...xInsertUpdateDeleteOperatorNodePushable.java |    2 +-
 .../IndexSearchOperatorNodePushable.java        |    2 +-
 ...eIndexDiskOrderScanOperatorNodePushable.java |    2 +-
 .../impls/NoOpOperationCallbackFactory.java     |   10 +-
 .../am/lsm/common/api/ILSMIndexFrameWriter.java |   49 +
 ...xInsertUpdateDeleteOperatorNodePushable.java |    7 +-
 .../lsm/common/impls/ConstantMergePolicy.java   |    1 -
 64 files changed, 638 insertions(+), 7767 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index a5fa9a6..814c570 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -42,7 +42,6 @@ import org.apache.asterix.optimizer.rules.IntroduceAutogenerateIDRule;
 import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastForExternalFunctionRule;
 import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
 import org.apache.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
-import org.apache.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
 import org.apache.asterix.optimizer.rules.IntroduceMaterializationForInsertWithSelfScanRule;
 import org.apache.asterix.optimizer.rules.IntroduceRandomPartitioningFeedComputationRule;
 import org.apache.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectAssignRule;
@@ -291,7 +290,6 @@ public final class RuleCollections {
         physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
         physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
-        physicalRewritesAllLevels.add(new IntroduceInstantLockSearchCallbackRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
         physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule());
         physicalRewritesAllLevels.add(new RemoveSortInFeedIngestionRule());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
deleted file mode 100644
index a3d0640..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.algebra.operators.CommitOperator;
-import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
-import org.apache.asterix.metadata.declared.AqlDataSource;
-import org.apache.asterix.metadata.declared.AqlMetadataImplConfig;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class IntroduceInstantLockSearchCallbackRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
-    }
-
-    private void extractDataSourcesInfo(AbstractLogicalOperator op,
-            Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap) {
-
-        for (int i = 0; i < op.getInputs().size(); ++i) {
-            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
-
-            if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
-                UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
-                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
-                if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                    AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
-                    FunctionIdentifier fid = f.getFunctionIdentifier();
-                    if (fid.equals(AsterixBuiltinFunctions.EXTERNAL_LOOKUP)) {
-                        return;
-                    }
-                    if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
-                        throw new IllegalStateException();
-                    }
-                    AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
-                    jobGenParams.readFromFuncArgs(f.getArguments());
-                    boolean isPrimaryIndex = jobGenParams.isPrimaryIndex();
-                    String indexName = jobGenParams.getIndexName();
-                    if (isPrimaryIndex) {
-                        if (dataSourcesMap.containsKey(indexName)) {
-                            ++(dataSourcesMap.get(indexName).first);
-                        } else {
-                            dataSourcesMap.put(indexName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
-                                    LogicalOperatorTag.UNNEST_MAP, unnestMapOp.getPhysicalOperator()));
-                        }
-                    }
-                }
-            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
-                DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
-                String datasourceName = ((AqlDataSource) dataSourceScanOp.getDataSource()).getId().getDatasourceName();
-                if (dataSourcesMap.containsKey(datasourceName)) {
-                    ++(dataSourcesMap.get(datasourceName).first);
-                } else {
-                    dataSourcesMap.put(datasourceName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
-                            LogicalOperatorTag.DATASOURCESCAN, dataSourceScanOp.getPhysicalOperator()));
-                }
-            }
-            extractDataSourcesInfo(descendantOp, dataSourcesMap);
-        }
-
-    }
-
-    private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
-        if (op.getPhysicalOperator() == null) {
-            return false;
-        }
-        if (op.getOperatorTag() == LogicalOperatorTag.EXTENSION_OPERATOR) {
-            ExtensionOperator extensionOp = (ExtensionOperator) op;
-            if (extensionOp.getDelegate() instanceof CommitOperator) {
-                return true;
-            }
-        }
-        if (op.getOperatorTag() == LogicalOperatorTag.DISTRIBUTE_RESULT
-                || op.getOperatorTag() == LogicalOperatorTag.WRITE_RESULT) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-
-        if (!checkIfRuleIsApplicable(op)) {
-            return false;
-        }
-        Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap = new HashMap<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>();
-        extractDataSourcesInfo(op, dataSourcesMap);
-
-        boolean introducedInstantLock = false;
-
-        Iterator<Map.Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>> it = dataSourcesMap
-                .entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> entry = it.next();
-            Triple<Integer, LogicalOperatorTag, IPhysicalOperator> triple = entry.getValue();
-            if (triple.first == 1) {
-                AqlMetadataImplConfig aqlMetadataImplConfig = new AqlMetadataImplConfig(true);
-                if (triple.second == LogicalOperatorTag.UNNEST_MAP) {
-                    BTreeSearchPOperator pOperator = (BTreeSearchPOperator) triple.third;
-                    pOperator.setImplConfig(aqlMetadataImplConfig);
-                    introducedInstantLock = true;
-                } else {
-                    DataSourceScanPOperator pOperator = (DataSourceScanPOperator) triple.third;
-                    pOperator.setImplConfig(aqlMetadataImplConfig);
-                    introducedInstantLock = true;
-                }
-            }
-
-        }
-        return introducedInstantLock;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index 27b19e2..abc03e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -52,6 +52,9 @@ public class AsterixTransactionProperties extends AbstractAsterixProperties {
     private static final String TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_KEY = "txn.lock.timeout.sweepthreshold";
     private static final int TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_DEFAULT = 10000; // 10s
 
+    private static final String TXN_COMMIT_PROFILER_REPORT_INTERVAL_KEY = "txn.commitprofiler.reportinterval";
+    private static final int TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT = 5; // 5 seconds
+
     public AsterixTransactionProperties(AsterixPropertiesAccessor accessor) {
         super(accessor);
     }
@@ -114,4 +117,9 @@ public class AsterixTransactionProperties extends AbstractAsterixProperties {
                 PropertyInterpreters.getIntegerPropertyInterpreter());
     }
 
+    public int getCommitProfilerReportInterval() {
+        return accessor.getProperty(TXN_COMMIT_PROFILER_REPORT_INTERVAL_KEY,
+                TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index d53236b..5445b11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -44,6 +45,20 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
     private AbstractLSMIndex lsmIndex;
     private int i = 0;
 
+    /**
+     * The following three variables are used to keep track of the information regarding flushing partial frame such as
+     * 1. whether there was a partial frame flush for the current frame,
+     * ==> captured in flushedPartialTuples variabl
+     * 2. the last flushed tuple index in the frame if there was a partial frame flush,
+     * ==> captured in lastFlushedTupleIdx variable
+     * 3. the current tuple index the frame, where this operator is working on the current tuple.
+     * ==> captured in currentTupleIdx variable
+     * These variables are reset for each frame, i.e., whenever nextFrame() is called, these variables are reset.
+     */
+    private boolean flushedPartialTuples;
+    private int currentTupleIdx;
+    private int lastFlushedTupleIdx;
+
     public boolean isPrimary() {
         return isPrimary;
     }
@@ -60,13 +75,14 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
+        appender = new FrameTupleAppender(writeBuffer);
         indexHelper.open();
         lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
-                    lsmIndex, ctx);
+                    lsmIndex, ctx, this);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
@@ -83,11 +99,15 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        currentTupleIdx = 0;
+        lastFlushedTupleIdx = 0;
+        flushedPartialTuples = false;
+
         accessor.reset(buffer);
         ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
         try {
-            for (; i < tupleCount; i++) {
+            for (; i < tupleCount; i++, currentTupleIdx++) {
                 if (tupleFilter != null) {
                     frameTuple.reset(accessor, i);
                     if (!tupleFilter.accept(frameTuple)) {
@@ -120,12 +140,34 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
             FrameDataException fde = new FrameDataException(i, th);
             throw fde;
         }
+
         writeBuffer.ensureFrameSize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
-        FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+        if (flushedPartialTuples) {
+            flushPartialFrame();
+        } else {
+            FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+            FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+        }
         i = 0;
     }
 
+    /**
+     * flushes tuples in a frame from lastFlushedTupleIdx(inclusive) to currentTupleIdx(exclusive)
+     */
+    @Override
+    public void flushPartialFrame() throws HyracksDataException {
+        if (lastFlushedTupleIdx == currentTupleIdx) {
+            //nothing to flush
+            return;
+        }
+        for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
+            FrameUtils.appendToWriter(writer, appender, accessor, i);
+        }
+        appender.write(writer, true);
+        lastFlushedTupleIdx = currentTupleIdx;
+        flushedPartialTuples = true;
+    }
+
     @Override
     public void close() throws HyracksDataException {
         if (lsmIndex != null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index 6f54918..d1d869e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -22,7 +22,6 @@ package org.apache.asterix.common.transactions;
 import java.io.Serializable;
 
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.transactions.JobId;
 
 public abstract class AbstractOperationCallbackFactory implements Serializable {
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
index 7909622..495af9a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
@@ -68,7 +68,6 @@ public interface ILockManager {
      * @param lockMode
      * @param txnContext
      * @throws ACIDException
-     *             TODO
      * @return
      */
     public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
@@ -80,7 +79,6 @@ public interface ILockManager {
      * @param datasetId
      * @param entityHashValue
      * @param lockMode
-     *            TODO
      * @param context
      * @return
      * @throws ACIDException

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 7e27c54..b86aebe 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -35,6 +35,7 @@ public interface ILogRecord {
     public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
     public static final int UPDATE_LOG_BASE_SIZE = 59;
     public static final int FLUSH_LOG_SIZE = 18;
+    public static final int WAIT_LOG_SIZE = 14;
 
     public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 003d4c6..2eb8244 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -65,6 +65,9 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
  * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
  *    --> UPDATE_LOG_BASE_SIZE = 59
  * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
+ * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
+ *    --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
+ *        it also includes LogSource and JobId fields.
  */
 
 public class LogRecord implements ILogRecord {
@@ -265,7 +268,16 @@ public class LogRecord implements ILogRecord {
         logType = buffer.get();
         jobId = buffer.getInt();
 
-        if (logType != LogType.FLUSH) {
+        if (logType == LogType.FLUSH) {
+            if (buffer.remaining() < DatasetId.BYTES) {
+                return RECORD_STATUS.TRUNCATED;
+            }
+            datasetId = buffer.getInt();
+            resourceId = 0l;
+            computeAndSetLogSize();
+        } else if (logType == LogType.WAIT) {
+            computeAndSetLogSize();
+        } else {
             if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
                 datasetId = -1;
                 PKHashValue = -1;
@@ -306,15 +318,8 @@ public class LogRecord implements ILogRecord {
             } else {
                 computeAndSetLogSize();
             }
-        } else {
-            computeAndSetLogSize();
-            if (buffer.remaining() < DatasetId.BYTES) {
-                return RECORD_STATUS.TRUNCATED;
-            }
-            datasetId = buffer.getInt();
-            resourceId = 0l;
-            computeAndSetLogSize();
         }
+
         return RECORD_STATUS.OK;
     }
 
@@ -412,6 +417,9 @@ public class LogRecord implements ILogRecord {
             case LogType.FLUSH:
                 logSize = FLUSH_LOG_SIZE;
                 break;
+            case LogType.WAIT:
+                logSize = WAIT_LOG_SIZE;
+                break;
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
@@ -425,7 +433,7 @@ public class LogRecord implements ILogRecord {
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
+        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" ResourcePartition : ").append(resourcePartition);
             builder.append(" PKHashValue : ").append(PKHashValue);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index d6d2657..714b8f7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -26,6 +26,7 @@ public class LogType {
     public static final byte ABORT = 3;
     public static final byte FLUSH = 4;
     public static final byte UPSERT_ENTITY_COMMIT = 5;
+    public static final byte WAIT = 6;
 
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -33,6 +34,7 @@ public class LogType {
     private static final String STRING_ABORT = "ABORT";
     private static final String STRING_FLUSH = "FLUSH";
     private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
+    private static final String STRING_WAIT = "WAIT";
 
     private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
 
@@ -50,6 +52,8 @@ public class LogType {
                 return STRING_FLUSH;
             case LogType.UPSERT_ENTITY_COMMIT:
                 return STRING_UPSERT_ENTITY_COMMIT;
+            case LogType.WAIT:
+                return STRING_WAIT;
             default:
                 return STRING_INVALID_LOG_TYPE;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
index 5d98961..667bae7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
@@ -92,7 +92,7 @@ public class ExternalFileIndexAccessor implements Serializable {
 
         // create the accessor  and the cursor using the passed version
         ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
-                .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx);
+                .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx, null);
         fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
         fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
index 0513f9c..9435387 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
@@ -73,7 +73,7 @@ public class ExternalBTreeSearchOperatorNodePushable extends BTreeSearchOperator
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx));
             ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
-                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx);
+                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, null);
             // The next line is the reason we override this method
             indexAccessor = externalIndex.createAccessor(searchCallback, dataFlowHelper.getTargetVersion());
             cursor = createCursor();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
index 75cc1bf..81e6b17 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
@@ -72,7 +72,7 @@ public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperator
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx));
             ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
-                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx);
+                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, null);
             // The next line is the reason we override this method
             indexAccessor = rTreeIndex.createAccessor(searchCallback, rTreeDataflowHelper.getTargetVersion());
             cursor = createCursor();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 542c12d..54ec084 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -79,7 +79,6 @@ import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -306,15 +305,13 @@ public class MetadataNode implements IMetadataNode {
             IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws ACIDException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
 
-        if (metadataIndex.isPrimaryIndex()) {
-            return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
-                    metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                    transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
-        } else {
-            return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
-                    metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                    transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
-        }
+        // Regardless of the index type (primary or secondary index), secondary index modification callback is given
+        // This is still correct since metadata index operation doesn't require any lock from ConcurrentLockMgr and
+        // The difference between primaryIndexModCallback and secondaryIndexModCallback is that primary index requires
+        // locks and secondary index doesn't.
+        return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+                metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
     }
 
     @Override
@@ -980,10 +977,8 @@ public class MetadataNode implements IMetadataNode {
             try {
                 while (rangeCursor.hasNext()) {
                     rangeCursor.next();
-                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
-                            new ISerializerDeserializer[] {
-                                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
-                                            BuiltinType.ASTRING),
+                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
                             AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
                             AqlSerializerDeserializerProvider.INSTANCE
                                     .getSerializerDeserializer(BuiltinType.ASTRING) }));
@@ -1000,7 +995,7 @@ public class MetadataNode implements IMetadataNode {
 
     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
             IValueExtractor<ResultType> valueExtractor, List<ResultType> results)
-                    throws MetadataException, IndexException, IOException {
+            throws MetadataException, IndexException, IOException {
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
         String resourceName = index.getFile().toString();
         IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 7070a88..ec2e692 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -148,8 +148,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      *             For example, if the dataverse does not exist. RemoteException
      */
-    public List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName) throws MetadataException,
-            RemoteException;
+    public List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName)
+            throws MetadataException, RemoteException;
 
     /**
      * Deletes the dataverse with given name, and all it's associated datasets,
@@ -194,8 +194,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *             For example, if the dataset does not exist.
      * @throws RemoteException
      */
-    public Dataset getDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException,
-            RemoteException;
+    public Dataset getDataset(JobId jobId, String dataverseName, String datasetName)
+            throws MetadataException, RemoteException;
 
     /**
      * Retrieves all indexes of a dataset, acquiring local locks on behalf of
@@ -229,8 +229,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *             For example, if the dataset and/or dataverse does not exist.
      * @throws RemoteException
      */
-    public void dropDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException,
-            RemoteException;
+    public void dropDataset(JobId jobId, String dataverseName, String datasetName)
+            throws MetadataException, RemoteException;
 
     /**
      * Inserts an index into the metadata, acquiring local locks on behalf of
@@ -313,8 +313,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *             For example, if the datatype does not exist.
      * @throws RemoteException
      */
-    public Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException,
-            RemoteException;
+    public Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName)
+            throws MetadataException, RemoteException;
 
     /**
      * Deletes the given datatype in given dataverse, acquiring local locks on
@@ -331,8 +331,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *             deleted.
      * @throws RemoteException
      */
-    public void dropDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException,
-            RemoteException;
+    public void dropDatatype(JobId jobId, String dataverseName, String datatypeName)
+            throws MetadataException, RemoteException;
 
     /**
      * Inserts a node group, acquiring local locks on behalf of the given
@@ -400,8 +400,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public Function getFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
-            RemoteException;
+    public Function getFunction(JobId jobId, FunctionSignature functionSignature)
+            throws MetadataException, RemoteException;
 
     /**
      * Deletes a function, acquiring local locks on behalf of the given
@@ -416,8 +416,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *             group to be deleted.
      * @throws RemoteException
      */
-    public void dropFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
-            RemoteException;
+    public void dropFunction(JobId jobId, FunctionSignature functionSignature)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -438,8 +438,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public List<Function> getDataverseFunctions(JobId jobId, String dataverseName) throws MetadataException,
-            RemoteException;
+    public List<Function> getDataverseFunctions(JobId jobId, String dataverseName)
+            throws MetadataException, RemoteException;
 
     /**
      * @param ctx
@@ -448,8 +448,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName) throws MetadataException,
-            RemoteException;
+    public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -475,8 +475,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *            if the adapter does not exists.
      * @throws RemoteException
      */
-    public void dropAdapter(JobId jobId, String dataverseName, String adapterName) throws MetadataException,
-            RemoteException;
+    public void dropAdapter(JobId jobId, String dataverseName, String adapterName)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -495,8 +495,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException,
-            RemoteException;
+    public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -506,8 +506,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
-            RemoteException;
+    public CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policy)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -550,7 +550,6 @@ public interface IMetadataNode extends Remote, Serializable {
      */
     public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException;
 
-
     /**
      * @param jobId
      * @param feedPolicy
@@ -567,9 +566,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public FeedPolicyEntity getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
-            RemoteException;
-
+    public FeedPolicyEntity getFeedPolicy(JobId jobId, String dataverse, String policy)
+            throws MetadataException, RemoteException;
 
     /**
      * Removes a library , acquiring local locks on behalf of the given
@@ -584,8 +582,8 @@ public interface IMetadataNode extends Remote, Serializable {
      *            if the library does not exists.
      * @throws RemoteException
      */
-    public void dropLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException,
-            RemoteException;
+    public void dropLibrary(JobId jobId, String dataverseName, String libraryName)
+            throws MetadataException, RemoteException;
 
     /**
      * Adds a library, acquiring local locks on behalf of the given
@@ -612,8 +610,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public Library getLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException,
-            RemoteException;
+    public Library getLibrary(JobId jobId, String dataverseName, String libraryName)
+            throws MetadataException, RemoteException;
 
     /**
      * Retireve libraries installed in a given dataverse.
@@ -626,8 +624,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public List<Library> getDataverseLibraries(JobId jobId, String dataverseName) throws MetadataException,
-            RemoteException;
+    public List<Library> getDataverseLibraries(JobId jobId, String dataverseName)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -648,8 +646,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws RemoteException
      * @throws MetadataException
      */
-    public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
-            RemoteException;
+    public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -658,8 +656,8 @@ public interface IMetadataNode extends Remote, Serializable {
      * @throws MetadataException
      * @throws RemoteException
      */
-    public List<FeedPolicyEntity> getDataversePolicies(JobId jobId, String dataverse) throws MetadataException,
-            RemoteException;
+    public List<FeedPolicyEntity> getDataversePolicies(JobId jobId, String dataverse)
+            throws MetadataException, RemoteException;
 
     /**
      * @param jobId
@@ -731,7 +729,6 @@ public interface IMetadataNode extends Remote, Serializable {
     public ExternalFile getExternalFile(JobId jobId, String dataverseName, String datasetName, Integer fileNumber)
             throws MetadataException, RemoteException;
 
-
     /**
      * update an existing dataset in the metadata, acquiring local locks on behalf
      * of the given transaction id.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 9fe72a4..5e2e227 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -110,7 +110,6 @@ import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOpera
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
@@ -730,17 +729,15 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     primaryKeyFields[i] = i;
                 }
 
-                AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
                 ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-                if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
-                    searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                            : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                                    txnSubsystemProvider, ResourceType.LSM_BTREE);
-                } else {
-                    searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                            : new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                                    txnSubsystemProvider, ResourceType.LSM_BTREE);
-                }
+
+                /**
+                 * Due to the read-committed isolation level,
+                 * we may acquire very short duration lock(i.e., instant lock) for readers.
+                 */
+                searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                        : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+                                txnSubsystemProvider, ResourceType.LSM_BTREE);
             }
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
                     .getMergePolicyFactory(dataset, mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 83c6e34..7785978 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -128,10 +128,10 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
-                    index, ctx);
+                    index, ctx, this);
 
             indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
-                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx));
+                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
@@ -191,6 +191,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
     //TODO: use tryDelete/tryInsert in order to prevent deadlocks
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
@@ -239,6 +240,15 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         }
     }
 
+    /**
+     * Flushes tuples (which have already been written to tuple appender's buffer in writeOutput() method)
+     * to the next operator/consumer.
+     */
+    @Override
+    public void flushPartialFrame() throws HyracksDataException {
+        appender.write(writer, true);
+    }
+
     private ITupleReference getPrevTupleWithFilter(ITupleReference prevTuple) throws IOException, AsterixException {
         prevRecWithPKWithFilterValue.reset();
         for (int i = 0; i < prevTuple.getFieldCount(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index 49cea94..ef3b218 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -20,18 +20,40 @@ package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
-import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 
 public class LockThenSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
 
-    public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
-            ITransactionContext txnCtx) {
-        super(datasetId, entityIdFields, txnCtx, lockManager);
+    /**
+     * variables used for deadlock-free locking protocol
+     */
+    private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable;
+    private final ILogManager logManager;
+    private final ILogRecord logRecord;
+
+    public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem txnSubsystem,
+            ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
+        super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
+        this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
+        this.logManager = txnSubsystem.getLogManager();
+        this.logRecord = new LogRecord();
+        logRecord.setTxnCtx(txnCtx);
+        logRecord.setLogSource(LogSource.LOCAL);
+        logRecord.setLogType(LogType.WAIT);
+        logRecord.setJobId(txnCtx.getJobId().getId());
+        logRecord.computeAndSetLogSize();
     }
 
     @Override
@@ -55,9 +77,49 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
     public void before(ITupleReference tuple) throws HyracksDataException {
         int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
-            lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+            if (operatorNodePushable != null) {
+
+                /**********************************************************************************
+                 * In order to achieve deadlock-free locking protocol during any write (insert/delete/upsert) operations,
+                 * the following logic is implemented.
+                 * See https://cwiki.apache.org/confluence/display/ASTERIXDB/Deadlock-Free+Locking+Protocol for more details.
+                 * 1. for each entry in a frame
+                 * 2. returnValue = tryLock() for an entry
+                 * 3. if returnValue == false
+                 * 3-1. flush all entries (which already acquired locks) to the next operator
+                 * : this will make all those entries reach commit operator so that corresponding commit logs will be created.
+                 * 3-2. create a WAIT log and wait until logFlusher thread will flush the WAIT log and gives notification
+                 * : this notification guarantees that all locks acquired by this transactor (or all locks acquired for the entries)
+                 * were released.
+                 * 3-3. acquire lock using lock() instead of tryLock() for the failed entry
+                 * : we know for sure this lock call will not cause deadlock since the transactor doesn't hold any other locks.
+                 * 4. create an update log and insert the entry
+                 * From the above logic, step 2 and 3 are implemented in this before() method.
+                 **********************/
+
+                //release all locks held by this actor (which is a thread) by flushing partial frame.
+                boolean tryLockSucceed = lockManager.tryLock(datasetId, pkHash, LockMode.X, txnCtx);
+                if (!tryLockSucceed) {
+                    //flush entries which have been inserted already to release locks hold by them
+                    operatorNodePushable.flushPartialFrame();
+
+                    //create WAIT log and wait until the WAIT log is flushed and notified by LogFlusher thread
+                    logWait();
+
+                    //acquire lock
+                    lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+                }
+
+            } else {
+                //operatorNodePushable can be null when metadata node operation is executed
+                lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+            }
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
     }
+
+    private void logWait() throws ACIDException {
+        logManager.log(logRecord);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 6bfb6cd..0d65c16 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -25,6 +25,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -40,13 +41,13 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
     }
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
-            throws HyracksDataException {
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx,
+            IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
-                    txnCtx);
+            return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem, txnCtx,
+                    operatorNodePushable);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index 9b14807..ba97287 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -26,6 +26,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -41,7 +42,7 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
     }
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 780f294..4bde490 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -19,11 +19,14 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -36,18 +39,57 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
         implements IModificationOperationCallback {
 
+    private final AsterixLSMInsertDeleteOperatorNodePushable operatorNodePushable;
+
     public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp) {
+            byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
+        this.operatorNodePushable = (AsterixLSMInsertDeleteOperatorNodePushable) operatorNodePushable;
     }
 
     @Override
     public void before(ITupleReference tuple) throws HyracksDataException {
         int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
-            lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+            if (operatorNodePushable != null) {
+
+                /**********************************************************************************
+                 * In order to achieve deadlock-free locking protocol during any write (insert/delete/upsert) operations,
+                 * the following logic is implemented.
+                 * See https://cwiki.apache.org/confluence/display/ASTERIXDB/Deadlock-Free+Locking+Protocol for more details.
+                 * 1. for each entry in a frame
+                 * 2. returnValue = tryLock() for an entry
+                 * 3. if returnValue == false
+                 * 3-1. flush all entries (which already acquired locks) to the next operator
+                 * : this will make all those entries reach commit operator so that corresponding commit logs will be created.
+                 * 3-2. create a WAIT log and wait until logFlusher thread will flush the WAIT log and gives notification
+                 * : this notification guarantees that all locks acquired by this transactor (or all locks acquired for the entries)
+                 * were released.
+                 * 3-3. acquire lock using lock() instead of tryLock() for the failed entry
+                 * : we know for sure this lock call will not cause deadlock since the transactor doesn't hold any other locks.
+                 * 4. create an update log and insert the entry
+                 * From the above logic, step 2 and 3 are implemented in this before() method.
+                 **********************/
+
+                //release all locks held by this actor (which is a thread) by flushing partial frame.
+                boolean tryLockSucceed = lockManager.tryLock(datasetId, pkHash, LockMode.X, txnCtx);
+                if (!tryLockSucceed) {
+                    //flush entries which have been inserted already to release locks hold by them
+                    operatorNodePushable.flushPartialFrame();
+
+                    //create WAIT log and wait until the WAIT log is flushed and notified by LogFlusher thread
+                    logWait();
+
+                    //acquire lock
+                    lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+                }
+
+            } else {
+                //operatorNodePushable can be null when metadata node operation is executed
+                lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+            }
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -62,4 +104,10 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi
             throw new HyracksDataException(e);
         }
     }
+
+    private void logWait() throws ACIDException {
+        logRecord.setLogType(LogType.WAIT);
+        logRecord.computeAndSetLogSize();
+        txnSubsystem.getLogManager().log(logRecord);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index db68b26..c406812 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -51,7 +52,8 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
-            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+            throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
@@ -65,7 +67,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
-                    resourcePartition, resourceType, indexOp);
+                    resourcePartition, resourceType, indexOp, operatorNodePushable);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index b483674..1dd8368 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -26,6 +26,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -41,7 +42,7 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
     }
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index ef2b498..168da99 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -48,7 +49,8 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
-            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+            throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
index 4e1ee63..5dfdcdc 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -29,7 +30,7 @@ public class SecondaryIndexSearchOperationCallbackFactory implements ISearchOper
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         return new SecondaryIndexSearchOperationCallback();
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index b08798c..8c91c1a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -49,7 +50,8 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
-            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+            throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 403d68d..3e11531 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -49,7 +50,8 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
-            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+            throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();



Mime
View raw message