asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Unblock Dataset Level Lock for Concurrent Jobs
Date Sun, 07 Feb 2016 06:46:39 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/619

Change subject: Unblock Dataset Level Lock for Concurrent Jobs
......................................................................

Unblock Dataset Level Lock for Concurrent Jobs

Data modification jobs and certain types of queries acquire dataset
level locks. Such locks prevent queries and modifications from
concurrently executing. To avoid deadlocks for long running data
modification jobs, we release the dataset lock per frame in the
presence of waiting transactions on the dataset.

Change-Id: I79bc17dbcc596023bec9122c4425596a2f96a574
---
M asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
M asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
M asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
M asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
M asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
M asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DummyLockManager.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
15 files changed, 171 insertions(+), 56 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/19/619/1

diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index d25e51f..7b56585 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -22,6 +22,9 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.FrameDataException;
+import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -43,6 +46,9 @@
     private final boolean isPrimary;
     private AbstractLSMIndex lsmIndex;
     private int i = 0;
+    private JobId jobId;
+    private ITransactionContext txnContext;
+    private int datasetId;
 
     public boolean isPrimary() {
         return isPrimary;
@@ -50,9 +56,10 @@
 
     public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext
ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
IndexOperation op,
-            boolean isPrimary) {
+            boolean isPrimary, JobId jobId) {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
         this.isPrimary = isPrimary;
+        this.jobId = jobId;
     }
 
     @Override
@@ -74,6 +81,12 @@
             }
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                     .getApplicationContext().getApplicationObject();
+            if (isPrimary) {
+                txnContext = runtimeCtx.getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId,
+                        false);
+                datasetId = ((AbstractOperationCallbackFactory) opDesc.getModificationOpCallbackFactory())
+                        .getDatasetId();
+            }
             AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Throwable th) {
             throw new HyracksDataException(th);
@@ -126,6 +139,9 @@
         FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
         FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
         i = 0;
+        if (isPrimary) {
+            txnContext.unblockConcurrentTransactions(datasetId);
+        }
     }
 
     @Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 0d7d1d7..b2e1298 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -41,24 +42,28 @@
 
     private final String indexName;
 
+    protected JobId jobId;
+
     public AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry
spec,
             RecordDescriptor recDesc, IStorageManagerInterface storageManager, IFileSplitProvider
fileSplitProvider,
             IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
             IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory
tokenizerFactory,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory
modificationOpCallbackFactory, String indexName) {
+            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory
modificationOpCallbackFactory,
+            String indexName, JobId jobId) {
         super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider,
tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories,
tokenizerFactory,
                 fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory);
         this.indexName = indexName;
+        this.jobId = jobId;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
{
         return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                recordDescProvider, op, false);
+                recordDescProvider, op, false, jobId);
     }
 
     public String getIndexName() {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
index c7304a1..9a40214 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,6 +44,8 @@
 
     private final boolean isPrimary;
 
+    protected final JobId jobId;
+
     /** the name of the index that is being operated upon **/
     private final String indexName;
 
@@ -53,19 +56,20 @@
             IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory, boolean isPrimary, String indexName,
             INullWriterFactory nullWriterFactory, IModificationOperationCallbackFactory modificationOpCallbackProvider,
-            ISearchOperationCallbackFactory searchOpCallbackProvider) {
+            ISearchOperationCallbackFactory searchOpCallbackProvider, JobId jobId) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider,
typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory,
                 tupleFilterFactory, nullWriterFactory, modificationOpCallbackProvider, searchOpCallbackProvider);
         this.isPrimary = isPrimary;
         this.indexName = indexName;
+        this.jobId = jobId;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
         return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                recordDescProvider, op, isPrimary);
+                recordDescProvider, op, isPrimary, jobId);
     }
 
     public boolean isPrimary() {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index 6f54918..64d9af2 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -22,7 +22,6 @@
 import java.io.Serializable;
 
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.transactions.JobId;
 
 public abstract class AbstractOperationCallbackFactory implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -41,4 +40,12 @@
         this.txnSubsystemProvider = txnSubsystemProvider;
         this.resourceType = resourceType;
     }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
index 7909622..1845e68 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
@@ -118,4 +118,10 @@
      */
     public String prettyPrint() throws ACIDException;
 
+    /**
+     * Checks if the dataset with id = {@code datasetId} has other transactions waiting for
them
+     * @param datasetId
+     * @return {@code true}, if another transaction is waiting for the resource to be released,
{@code false}, otherwise.
+     */
+    public boolean hasWaiters(int datasetId);
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 20ede18..e539e5a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.transactions;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public interface ITransactionContext {
@@ -56,4 +57,8 @@
     public void incrementNumActiveOperations();
 
     public void decrementNumActiveOperations();
+
+    public default void unblockConcurrentTransactions(int datasetId) throws HyracksDataException
{
+        throw new HyracksDataException("Unsupported Operation");
+    }
 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f3523da..945642d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -1189,7 +1189,7 @@
                         appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                         splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
                         fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
+                        NoOpOperationCallbackFactory.INSTANCE, jobId);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
 
@@ -1676,7 +1676,7 @@
                                 storageProperties.getBloomFilterFalsePositiveRate(), false,
filterTypeTraits,
                                 filterCmpFactories, btreeFields, filterFields, !temp),
                         filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
+                        NoOpOperationCallbackFactory.INSTANCE, jobId);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1888,7 +1888,7 @@
                         appContext.getStorageManagerInterface(), splitsAndConstraint.first,
                         appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                         invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, modificationCallbackFactory,
indexName);
+                        indexDataFlowFactory, filterFactory, modificationCallbackFactory,
indexName, jobId);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -2036,7 +2036,7 @@
                                 storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields,
btreeFields,
                                 filterTypeTraits, filterCmpFactories, filterFields, !temp),
                         filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
+                        NoOpOperationCallbackFactory.INSTANCE, jobId);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
         } catch (MetadataException | IOException e) {
@@ -2364,7 +2364,7 @@
                     appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
fieldPermutation,
                     idfh, null, true, indexName, context.getNullWriterFactory(), modificationCallbackFactory,
-                    searchCallbackFactory, null);
+                    searchCallbackFactory, null, jobId);
             op.setType(itemType);
             op.setFilterIndex(fieldIdx);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
@@ -2647,7 +2647,8 @@
                     appContext.getStorageManagerInterface(), splitsAndConstraint.first,
                     appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                     invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
-                    indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName,
prevFieldPermutation);
+                    indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName,
prevFieldPermutation,
+                    jobId);
 
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -2804,7 +2805,7 @@
                             storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields,
btreeFields,
                             filterTypeTraits, filterCmpFactories, filterFields, !temp),
                     filterFactory, false, indexName, null, modificationCallbackFactory,
-                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation, jobId);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
         } catch (MetadataException | IOException e) {
             throw new AlgebricksException(e);
@@ -2953,7 +2954,7 @@
                     appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                     splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
fieldPermutation,
                     idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
-                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation, jobId);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op,
splitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
index 3db3de2..dd5b1df 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.operators;
 
 import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -48,11 +49,11 @@
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory
tokenizerFactory,
             int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory
modificationOpCallbackFactory,
-            String indexName, int[] prevFieldPermutation) {
+            String indexName, int[] prevFieldPermutation, JobId jobId) {
         super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider,
tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories,
tokenizerFactory,
                 fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory,
-                modificationOpCallbackFactory, indexName);
+                modificationOpCallbackFactory, indexName, jobId);
         this.prevFieldPermutation = prevFieldPermutation;
     }
 
@@ -60,6 +61,6 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
{
         return new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                recordDescProvider, prevFieldPermutation);
+                recordDescProvider, prevFieldPermutation, jobId);
     }
 }
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index f35f4d6..3a85589 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -25,6 +25,9 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -72,10 +75,13 @@
     private int presetFieldIndex = -1;
     private ARecordPointable recPointable;
     private DataOutput prevDos;
+    private JobId jobId;
+    private ITransactionContext txnContext;
+    private int datasetId;
 
     public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext
ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
int numOfPrimaryKeys,
-            ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+            ARecordType recordType, int filterFieldIndex, JobId jobId) throws HyracksDataException
{
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
         // initialize nullWriter
         this.nullWriter = opDesc.getNullWriterFactory().createNullWriter();
@@ -87,6 +93,7 @@
         }
         key.setFieldPermutation(searchKeyPermutations);
         this.numOfPrimaryKeys = numOfPrimaryKeys;
+        this.jobId = jobId;
         if (fieldPermutation.length > numOfPrimaryKeys + 1) {
             isFiltered = true;
             this.recordType = recordType;
@@ -133,6 +140,9 @@
             frameTuple = new FrameTupleReference();
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                     .getApplicationContext().getApplicationObject();
+            txnContext = runtimeCtx.getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId,
+                    false);
+            datasetId = ((AbstractOperationCallbackFactory) opDesc.getModificationOpCallbackFactory()).getDatasetId();
             AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
             indexHelper.close();
@@ -216,6 +226,8 @@
             if (tupleCount > 0) {
                 // All tuples has to move forward to maintain the correctness of the transaction
pipeline
                 appender.write(writer, true);
+                txnContext.unblockConcurrentTransactions(datasetId);
+
             }
         } catch (Exception e) {
             e.printStackTrace();
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
index 65dc83f..e34c40d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -54,13 +55,15 @@
     private int numberOfFields;
     private boolean isNewNull = false;
     private boolean isPrevValueNull = false;
+    private JobId jobId;
 
     public AsterixLSMSecondaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
-            int[] prevValuePermutation) {
+            int[] prevValuePermutation, JobId jobId) {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
         this.prevValueTuple.setFieldPermutation(prevValuePermutation);
         this.numberOfFields = prevValuePermutation.length;
+        this.jobId = jobId;
     }
 
     public static boolean equals(byte[] a, int aOffset, int aLength, byte[] b, int bOffset,
int bLength) {
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
index 803e15d..db6b1aa 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.operators;
 
 import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -52,11 +53,11 @@
             IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
             boolean isPrimary, String indexName, INullWriterFactory nullWriterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackProvider,
-            ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation)
{
+            ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
JobId jobId) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider,
typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
                 dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, nullWriterFactory,
-                modificationOpCallbackProvider, searchOpCallbackProvider);
+                modificationOpCallbackProvider, searchOpCallbackProvider, jobId);
         this.prevValuePermutation = prevValuePermutation;
     }
 
@@ -65,9 +66,9 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
         return isPrimary()
                 ? new AsterixLSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                        recordDescProvider, comparatorFactories.length, type, filterIndex)
+                        recordDescProvider, comparatorFactories.length, type, filterIndex,
jobId)
                 : new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition,
fieldPermutation,
-                        recordDescProvider, prevValuePermutation);
+                        recordDescProvider, prevValuePermutation, jobId);
     }
 
     public void setType(ARecordType type) {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index e268134..d563f20 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -104,6 +104,12 @@
     }
 
     @Override
+    public boolean hasWaiters(int datasetId) {
+        final ResourceGroup group = table.get(datasetId, -1);
+        return group.hasWaiters();
+    }
+
+    @Override
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext
txnContext)
             throws ACIDException {
         log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
@@ -166,6 +172,7 @@
         } catch (InterruptedException e) {
             throw new WaitInterruptedException(txnContext, "interrupted", e);
         } finally {
+
             group.releaseLatch();
         }
 
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DummyLockManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DummyLockManager.java
index 8ffa775..8a14116 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DummyLockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DummyLockManager.java
@@ -28,7 +28,6 @@
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
-
 /**
  * A dummy implementation of the ILockManager interface. It assumes that all
  * requests are successful. It can be used to for jobs that are known to be
@@ -90,4 +89,9 @@
     public void dumpState(OutputStream os) throws IOException {
     }
 
+    @Override
+    public boolean hasWaiters(int datasetId) {
+        return false;
+    }
+
 }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
index f1e1915..3867f20 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
@@ -95,14 +95,14 @@
         this.waiterLatch = new ReentrantReadWriteLock(true);
         this.jobHT = new HashMap<JobId, JobInfo>();
         this.datasetResourceHT = new HashMap<DatasetId, DatasetLockInfo>();
-        this.entityInfoManager = new EntityInfoManager(txnSubsystem.getTransactionProperties()
-                .getLockManagerShrinkTimer());
+        this.entityInfoManager = new EntityInfoManager(
+                txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer());
         this.lockWaiterManager = new LockWaiterManager();
         this.entityLockInfoManager = new EntityLockInfoManager(entityInfoManager, lockWaiterManager);
-        this.deadlockDetector = new DeadlockDetector(jobHT, datasetResourceHT, entityLockInfoManager,
-                entityInfoManager, lockWaiterManager);
-        this.toutDetector = new TimeOutDetector(this, txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getThreadExecutor());
+        this.deadlockDetector = new DeadlockDetector(jobHT, datasetResourceHT, entityLockInfoManager,
entityInfoManager,
+                lockWaiterManager);
+        this.toutDetector = new TimeOutDetector(this,
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor());
         this.tempDatasetIdObj = new DatasetId(0);
         this.tempJobIdObj = new JobId(0);
         this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
@@ -200,9 +200,8 @@
                         jobInfo.increaseDatasetISLockCount(dId);
                         if (doEscalate) {
                             throw new IllegalStateException(
-                                    "ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set
to "
-                                            + txnSubsystem.getTransactionProperties()
-                                                    .getEntityToDatasetLockEscalationThreshold());
+                                    "ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set
to " + txnSubsystem
+                                            .getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
                         }
                     }
                 }
@@ -262,8 +261,8 @@
         return;
     }
 
-    private void releaseDatasetISLocks(JobInfo jobInfo, JobId jobId, DatasetId datasetId,
ITransactionContext txnContext)
-            throws ACIDException {
+    private void releaseDatasetISLocks(JobInfo jobInfo, JobId jobId, DatasetId datasetId,
+            ITransactionContext txnContext) throws ACIDException {
         int entityInfo;
         int prevEntityInfo;
         int entityHashValue;
@@ -534,8 +533,8 @@
         return entityInfo;
     }
 
-    private void lockEntityGranule(DatasetId datasetId, int entityHashValue, byte lockMode,
-            int entityInfoFromDLockInfo, ITransactionContext txnContext) throws ACIDException
{
+    private void lockEntityGranule(DatasetId datasetId, int entityHashValue, byte lockMode,
int entityInfoFromDLockInfo,
+            ITransactionContext txnContext) throws ACIDException {
         JobId jobId = txnContext.getJobId();
         int jId = jobId.getId(); //int-type jobId
         int waiterObjId;
@@ -565,8 +564,8 @@
                     entityInfoManager.setEntityLockMode(entityInfo, LockMode.X);
                     entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount - 1);
 
-                    entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X, (short)
(weakerModeLockCount
-                            + waiterCount - 1));//new lock mode
+                    entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X,
+                            (short) (weakerModeLockCount + waiterCount - 1));//new lock mode
                     entityLockInfoManager.decreaseLockCount(eLockInfo, LockMode.S, (short)
weakerModeLockCount);//old lock mode
                 }
                 return;
@@ -576,7 +575,8 @@
             waiterObjId = entityLockInfoManager.findWaiterFromWaiterList(eLockInfo, jId,
entityHashValue);
             if (waiterObjId != -1) {
                 entityInfo = lockWaiterManager.getLockWaiter(waiterObjId).getEntityInfoSlot();
-                waiterCount = handleLockWaiter(dLockInfo, eLockInfo, -1, false, false, txnContext,
jobInfo, waiterObjId);
+                waiterCount = handleLockWaiter(dLockInfo, eLockInfo, -1, false, false, txnContext,
jobInfo,
+                        waiterObjId);
 
                 if (waiterCount > 0) {
                     entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount);
@@ -606,8 +606,8 @@
                         entityInfoManager.setEntityLockMode(entityInfo, lockMode);
                         entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount
- 1);
 
-                        entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X, (short)
(weakerModeLockCount
-                                + waiterCount - 1));//new lock mode
+                        entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X,
+                                (short) (weakerModeLockCount + waiterCount - 1));//new lock
mode
                         entityLockInfoManager.decreaseLockCount(eLockInfo, LockMode.S, (short)
weakerModeLockCount);//old lock mode
                     }
 
@@ -753,8 +753,8 @@
 
             if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
                 if (!isInstant && datasetLockMode == LockMode.IS) {
-                    jobInfo.decreaseDatasetISLockCount(datasetId.getId(), txnSubsystem.getTransactionProperties()
-                            .getEntityToDatasetLockEscalationThreshold());
+                    jobInfo.decreaseDatasetISLockCount(datasetId.getId(),
+                            txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
                 }
             }
 
@@ -1011,8 +1011,8 @@
 
             //#. the datasetLockInfo exists in datasetResourceHT.
             //1. handle dataset-granule lock
-            byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S
? LockMode.IS
-                    : LockMode.IX;
+            byte datasetLockMode = entityHashValue == -1 ? lockMode
+                    : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
             if (datasetLockMode == LockMode.IS) {
                 //[Notice]
                 //Skip checking the dataset level lock compatibility if the requested LockMode
is IS lock.
@@ -1267,9 +1267,8 @@
                             //This exception is thrown when the threshold value is set to
1.
                             //We don't want to allow the lock escalation when there is a
first lock request on a dataset.
                             throw new IllegalStateException(
-                                    "ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set
to "
-                                            + txnSubsystem.getTransactionProperties()
-                                                    .getEntityToDatasetLockEscalationThreshold());
+                                    "ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set
to " + txnSubsystem
+                                            .getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
                         }
                     }
                 }
@@ -1720,7 +1719,7 @@
 
     private int handleLockWaiter(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo,
boolean isUpgrade,
             boolean isDatasetLockInfo, ITransactionContext txnContext, JobInfo jobInfo, int
duplicatedWaiterObjId)
-            throws ACIDException {
+                    throws ACIDException {
         int waiterId = -1;
         LockWaiter waiter;
         int waiterCount = 0;
@@ -1772,9 +1771,9 @@
                     while (waiter.needWait()) {
                         try {
                             if (IS_DEBUG_MODE) {
-                                System.out.println("" + Thread.currentThread().getName()
+ "\twaits("
-                                        + waiter.getWaiterCount() + "): WID(" + waiterId
+ "),EID("
-                                        + waiter.getEntityInfoSlot() + ")");
+                                System.out.println(
+                                        "" + Thread.currentThread().getName() + "\twaits("
+ waiter.getWaiterCount()
+                                                + "): WID(" + waiterId + "),EID(" + waiter.getEntityInfoSlot()
+ ")");
                             }
                             waiter.wait();
                         } catch (InterruptedException e) {
@@ -1854,8 +1853,8 @@
 
     private void requestAbort(ITransactionContext txnContext) throws ACIDException {
         txnContext.setTimeout(true);
-        throw new ACIDException("Transaction " + txnContext.getJobId()
-                + " should abort (requested by the Lock Manager)");
+        throw new ACIDException(
+                "Transaction " + txnContext.getJobId() + " should abort (requested by the
Lock Manager)");
     }
 
     /**
@@ -2228,6 +2227,11 @@
             unlatchLockTable();
         }
     }
+
+    @Override
+    public boolean hasWaiters(int datasetId) {
+        throw new UnsupportedOperationException();
+    }
 }
 
 class ConsecutiveWakeupContext {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index 47a92cb..c81619f 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
@@ -94,6 +95,10 @@
 
     private final AtomicInteger transactorNumActiveOperations;
 
+    private boolean waiting;
+
+    private ILockManager lockMgr;
+
     // TODO: implement transactionContext pool in order to avoid object
     // creations.
     // also, the pool can throttle the number of concurrent active jobs at every
@@ -112,6 +117,7 @@
         logRecord = new LogRecord();
         logRecord.setNodeId(transactionSubsystem.getId());
         transactorNumActiveOperations = new AtomicInteger(0);
+        lockMgr = transactionSubsystem.getLockManager();
     }
 
     @Override
@@ -247,6 +253,39 @@
 
     @Override
     public void decrementNumActiveOperations() {
-        transactorNumActiveOperations.decrementAndGet();
+        int numberOfActicOps = transactorNumActiveOperations.decrementAndGet();
+        if (numberOfActicOps == 0 && waiting) {
+            try {
+                lockMgr.releaseLocks(this);
+            } catch (ACIDException e) {
+                throw new IllegalStateException(e);
+            }
+            synchronized (this) {
+                notifyAll();
+            }
+            waiting = false;
+        }
+    }
+
+    @Override
+    public void unblockConcurrentTransactions(int datasetId) throws HyracksDataException
{
+        if (lockMgr.hasWaiters(datasetId)) {
+            if (transactorNumActiveOperations.get() > 0) {
+                synchronized (this) {
+                    waiting = true;
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            } else {
+                try {
+                    lockMgr.releaseLocks(this);
+                } catch (ACIDException e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        }
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/619
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I79bc17dbcc596023bec9122c4425596a2f96a574
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>


Mime
View raw message