asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [01/13] incubator-asterixdb git commit: Add Support for Upsert Operation
Date Mon, 01 Feb 2016 08:31:20 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master ee387c12b -> 5dc958ce1


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
new file mode 100644
index 0000000..dfc622a
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+
+public class UpsertOperationCallback extends AbstractIndexModificationOperationCallback
+        implements IModificationOperationCallback {
+
+    public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext
txnCtx,
+            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
byte resourceType,
+            IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId,
resourceType, indexOp);
+    }
+
+    @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+        // Do nothing, as lock has been acquired by preceeding search
+    }
+
+    @Override
+    public void found(ITupleReference before, ITupleReference after) throws HyracksDataException
{
+        try {
+            int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
+            log(pkHash, after);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
new file mode 100644
index 0000000..0c83ab5
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements IModificationOperationCallbackFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final IndexOperation indexOp;
+
+    public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte
resourceType) {
+        super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        this.indexOp = indexOp;
+    }
+
+    @Override
+    public IModificationOperationCallback createModificationOperationCallback(String resourceName,
long resourceId,
+            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+
+        ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        if (index == null) {
+            throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+        }
+
+        try {
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId,
false);
+            IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId,
primaryKeyFields,
+                    txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
indexOp);
+            txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback)
modCallback, true);
+            return modCallback;
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 45ba9bd..6060dd7 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -254,14 +254,20 @@ public class LogBuffer implements ILogBuffer {
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogSource() == LogSource.LOCAL) {
-                    if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
+                    if (logRecord.getLogType() == LogType.ENTITY_COMMIT
+                            || logRecord.getLogType() == LogType.UPSERT_ENTITY_COMMIT) {
                         reusableJobId.setId(logRecord.getJobId());
                         txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId,
false);
                         reusableDsId.setId(logRecord.getDatasetId());
                         txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(),
LockMode.ANY,
                                 txnCtx);
                         txnCtx.notifyOptracker(false);
-                    } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType()
== LogType.ABORT) {
+                        if (logRecord.getLogType() == LogType.UPSERT_ENTITY_COMMIT) {
+                            // since this operation consisted of delete and insert, we need
to notify the optracker twice
+                            txnCtx.notifyOptracker(false);
+                        }
+                    } else if (logRecord.getLogType() == LogType.JOB_COMMIT
+                            || logRecord.getLogType() == LogType.ABORT) {
                         reusableJobId.setId(logRecord.getJobId());
                         txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId,
false);
                         txnCtx.notifyOptracker(true);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2f824df..11dc282 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -126,6 +126,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
      * of the operation, or the system can be recovered concurrently. This kind of concurrency
is
      * not supported, yet.
      */
+    @Override
     public SystemState getSystemState() throws ACIDException {
         //read checkpoint file
         CheckpointObject checkpointObject = null;
@@ -180,6 +181,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
     }
 
     //This method is used only when replication is disabled. Therefore, there is no need
to check logs node ids
+    @Override
     public void startRecovery(boolean synchronous) throws IOException, ACIDException {
         //delete any recovery files from previous failed recovery attempts
         deleteRecoveryTemporaryFiles();
@@ -244,6 +246,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                         jobCommitLogCount++;
                         break;
                     case LogType.ENTITY_COMMIT:
+                    case LogType.UPSERT_ENTITY_COMMIT:
                         jobId = logRecord.getJobId();
                         if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
                             jobEntityWinners = new JobEntityCommits(jobId);
@@ -376,6 +379,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                     case LogType.ENTITY_COMMIT:
                     case LogType.ABORT:
                     case LogType.FLUSH:
+                    case LogType.UPSERT_ENTITY_COMMIT:
                         //do nothing
                         break;
                     default:
@@ -452,6 +456,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                         updateLogCount++;
                         break;
                     case LogType.JOB_COMMIT:
+                    case LogType.UPSERT_ENTITY_COMMIT:
                         winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
                         jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
                         jobCommitLogCount++;
@@ -588,6 +593,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                     case LogType.ENTITY_COMMIT:
                     case LogType.ABORT:
                     case LogType.FLUSH:
+                    case LogType.UPSERT_ENTITY_COMMIT:
 
                         //do nothing
                         break;
@@ -737,6 +743,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         return minMCTFirstLSN;
     }
 
+    @Override
     public long getMinFirstLSN() throws HyracksDataException {
         long minFirstLSN = getLocalMinFirstLSN();
 
@@ -749,6 +756,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
         return minFirstLSN;
     }
 
+    @Override
     public long getLocalMinFirstLSN() throws HyracksDataException {
         IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
@@ -969,6 +977,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
                             }
                             break;
                         case LogType.ENTITY_COMMIT:
+                        case LogType.UPSERT_ENTITY_COMMIT:
                             jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
                             entityCommitLogCount++;
                             if (IS_DEBUG_MODE) {
@@ -1003,6 +1012,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent
{
             while (iter.hasNext()) {
                 Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next();
                 undoLSNSet = loserEntity2LSNsMap.getValue();
+                // The step below is important since the upsert operations must be done in
reverse order.
+                Collections.reverse(undoLSNSet);
                 for (long undoLSN : undoLSNSet) {
                     //here, all the log records are UPDATE type. So, we don't need to check
the type again.
                     //read the corresponding log record to be undone.
@@ -1637,4 +1648,4 @@ class TxnId {
         }
         return size;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 57ced9c..2112097 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -32,6 +32,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
@@ -58,7 +59,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
         try {
             if (txnCtx.isWriteTxn()) {
                 LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
-                logRecord.formJobTerminateLogRecord(txnCtx, false);
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
                 txnSubsystem.getLogManager().log(logRecord);
                 txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
             }
@@ -107,7 +108,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
         try {
             if (txnCtx.isWriteTxn()) {
                 LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
-                logRecord.formJobTerminateLogRecord(txnCtx, true);
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, true);
                 txnSubsystem.getLogManager().log(logRecord);
             }
         } catch (Exception ae) {
@@ -123,8 +124,8 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
     }
 
     @Override
-    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId,
int PKHashVal, boolean success)
-            throws ACIDException {
+    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId,
int PKHashVal,
+            boolean success) throws ACIDException {
         if (!success) {
             abortTransaction(txnContext, datasetId, PKHashVal);
         } else {


Mime
View raw message