asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject incubator-asterixdb git commit: Optionally log image before when before image found in memory component
Date Fri, 03 Jun 2016 19:13:09 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 2dff79736 -> df0887053


Optionally log image before when before image found in memory component

In addition, this change fixes an issue with one of the test cases for
FrameSpiller.

Change-Id: Iaaed48f4c2ca8d83253e81cd7c60aad998b67b1e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/900
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/df088705
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/df088705
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/df088705

Branch: refs/heads/master
Commit: df08870532588d3e23e123c49e24ac2e8efbf8c0
Parents: 2dff797
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Fri Jun 3 18:54:51 2016 +0300
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Fri Jun 3 12:11:44 2016 -0700

----------------------------------------------------------------------
 .../config/AsterixPropertiesAccessor.java       |   5 +-
 ...erixLSMInsertDeleteOperatorNodePushable.java |   6 +-
 .../asterix/common/transactions/ILogRecord.java |   8 +-
 .../asterix/common/transactions/LogRecord.java  | 161 +++++++++++++------
 .../external/feed/test/InputHandlerTest.java    |  83 +++++-----
 .../apache/asterix/metadata/MetadataNode.java   |  10 +-
 .../metadata/declared/AqlMetadataProvider.java  |  44 ++---
 ...tractIndexModificationOperationCallback.java |  12 +-
 ...imaryIndexModificationOperationCallback.java |  11 +-
 ...dexModificationOperationCallbackFactory.java |  11 +-
 ...ndaryIndexModificationOperationCallback.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   7 +-
 .../opcallbacks/UpsertOperationCallback.java    |   6 +-
 .../UpsertOperationCallbackFactory.java         |  15 +-
 asterixdb/pom.xml                               |   2 +-
 .../am/common/tuples/SimpleTupleWriter.java     |  15 +-
 .../common/tuples/SimpleTupleWriterFactory.java |  34 ----
 .../storage/am/lsm/common/impls/LSMHarness.java |   4 +-
 18 files changed, 249 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 45e3b06..507a393 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -132,7 +132,7 @@ public class AsterixPropertiesAccessor {
     /**
      * Constructor which wraps an IApplicationConfig.
      */
-    public AsterixPropertiesAccessor (IApplicationConfig cfg) {
+    public AsterixPropertiesAccessor(IApplicationConfig cfg) {
         this.cfg = cfg;
         instanceName = cfg.getString("asterix", "instance", "DEFAULT_INSTANCE");
         String mdNode = null;
@@ -234,7 +234,8 @@ public class AsterixPropertiesAccessor {
             return interpreter.interpret(value);
         } catch (IllegalArgumentException e) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
-                StringBuilder msg = new StringBuilder("Invalid property value '" + value + "' for property '" + property + "'.\n");
+                StringBuilder msg =
+                        new StringBuilder("Invalid property value '" + value + "' for property '" + property + "'.\n");
                 if (p != null) {
                     msg.append("See the description: \n" + p.getDescription() + "\n");
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 5445b11..9a76b40 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -48,7 +48,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
     /**
      * The following three variables are used to keep track of the information regarding flushing partial frame such as
      * 1. whether there was a partial frame flush for the current frame,
-     * ==> captured in flushedPartialTuples variabl
+     * ==> captured in flushedPartialTuples variable
      * 2. the last flushed tuple index in the frame if there was a partial frame flush,
      * ==> captured in lastFlushedTupleIdx variable
      * 3. the current tuple index the frame, where this operator is working on the current tuple.
@@ -89,8 +89,8 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
                 tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
                 frameTuple = new FrameTupleReference();
             }
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx =
+                    (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
             AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Throwable th) {
             throw new HyracksDataException(th);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 1992a00..cd05ba0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -78,8 +78,6 @@ public interface ILogRecord {
 
     public void setNewOp(byte newOp);
 
-    public int getNewValueSize();
-
     public void setNewValueSize(int newValueSize);
 
     public ITupleReference getNewValue();
@@ -134,4 +132,10 @@ public interface ILogRecord {
     public boolean isReplicated();
 
     public void writeRemoteLogRecord(ByteBuffer buffer);
+
+    public ITupleReference getOldValue();
+
+    public void setOldValue(ITupleReference oldValue);
+
+    public void setOldValueSize(int oldValueSize);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 4823a92..23fdd0f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -100,10 +100,13 @@ public class LogRecord implements ILogRecord {
     private long resourceId;
     private int resourcePartition;
     private int logSize;
-    private int fieldCnt;
+    private int newValueFieldCount;
     private byte newOp;
     private int newValueSize;
     private ITupleReference newValue;
+    private int oldValueSize;
+    private ITupleReference oldValue;
+    private int oldValueFieldCount;
     private long checksum;
     // ------------- fields in a log record (end) --------------//
 
@@ -111,9 +114,9 @@ public class LogRecord implements ILogRecord {
     private ITransactionContext txnCtx;
     private long LSN;
     private final AtomicBoolean isFlushed;
-    private final SimpleTupleWriter tupleWriter;
     private final PrimaryKeyTupleReference readPKValue;
     private final SimpleTupleReference readNewValue;
+    private final SimpleTupleReference readOldValue;
     private final CRC32 checksumGen;
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
@@ -128,9 +131,9 @@ public class LogRecord implements ILogRecord {
 
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
-        tupleWriter = new SimpleTupleWriter();
         readPKValue = new PrimaryKeyTupleReference();
-        readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+        readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
+        readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference();
         checksumGen = new CRC32();
         logSource = LogSource.LOCAL;
     }
@@ -152,10 +155,15 @@ public class LogRecord implements ILogRecord {
         if (logType == LogType.UPDATE) {
             buffer.putLong(resourceId);
             buffer.putInt(logSize);
-            buffer.putInt(fieldCnt);
+            buffer.putInt(newValueFieldCount);
             buffer.put(newOp);
             buffer.putInt(newValueSize);
             writeTuple(buffer, newValue, newValueSize);
+            if (oldValueSize > 0) {
+                buffer.putInt(oldValueSize);
+                buffer.putInt(oldValueFieldCount);
+                writeTuple(buffer, oldValue, oldValueSize);
+            }
         }
         if (logType == LogType.FLUSH) {
             buffer.putInt(datasetId);
@@ -195,7 +203,7 @@ public class LogRecord implements ILogRecord {
 
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
         if (logSource == LogSource.LOCAL) {
-            tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+            SimpleTupleWriter.INSTANCE.writeTuple(tuple, buffer.array(), buffer.position());
         } else {
             //since the tuple is already serialized in remote logs, just copy it from beginning to end.
             System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size);
@@ -241,64 +249,93 @@ public class LogRecord implements ILogRecord {
         logSource = buffer.get();
         logType = buffer.get();
         jobId = buffer.getInt();
-
-        if (logType == LogType.FLUSH) {
-            if (buffer.remaining() < DatasetId.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            datasetId = buffer.getInt();
-            resourceId = 0l;
-            computeAndSetLogSize();
-        } else if (logType == LogType.WAIT) {
-            computeAndSetLogSize();
-        } else {
-            if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
-                datasetId = -1;
-                PKHashValue = -1;
-            } else {
-                //attempt to read in the resourcePartition, dsid, PK hash and PK length
-                if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+        switch (logType) {
+            case LogType.FLUSH:
+                if (buffer.remaining() < DatasetId.BYTES) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                resourcePartition = buffer.getInt();
                 datasetId = buffer.getInt();
-                PKHashValue = buffer.getInt();
-                PKValueSize = buffer.getInt();
-                // attempt to read in the PK
-                if (buffer.remaining() < PKValueSize) {
+                resourceId = 0L;
+                break;
+            case LogType.ABORT:
+            case LogType.JOB_COMMIT:
+                datasetId = -1;
+                PKHashValue = -1;
+                break;
+            case LogType.ENTITY_COMMIT:
+            case LogType.UPSERT_ENTITY_COMMIT:
+                if (!readEntityInfo(buffer)) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                if (PKValueSize <= 0) {
-                    throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+                break;
+            case LogType.UPDATE:
+                if (!readEntityInfo(buffer)) {
+                    return RecordReadStatus.TRUNCATED;
                 }
-                PKValue = readPKValue(buffer);
-            }
-
-            if (logType == LogType.UPDATE) {
-                // attempt to read in the previous LSN, log size, new value size, and new record type
                 if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
                     return RecordReadStatus.TRUNCATED;
                 }
                 resourceId = buffer.getLong();
                 logSize = buffer.getInt();
-                fieldCnt = buffer.getInt();
+                newValueFieldCount = buffer.getInt();
                 newOp = buffer.get();
                 newValueSize = buffer.getInt();
-                if (buffer.remaining() < newValueSize) {
-                    if (logSize > buffer.capacity()) {
-                        return RecordReadStatus.LARGE_RECORD;
-                    }
-                    return RecordReadStatus.TRUNCATED;
-                }
-                newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
-            } else {
-                computeAndSetLogSize();
-            }
+                return readEntity(buffer);
+            default:
+                break;
         }
+        computeAndSetLogSize();
+        return RecordReadStatus.OK;
+    }
 
+    private RecordReadStatus readEntity(ByteBuffer buffer) {
+        if (buffer.remaining() < newValueSize) {
+            if (logSize > buffer.capacity()) {
+                return RecordReadStatus.LARGE_RECORD;
+            }
+            return RecordReadStatus.TRUNCATED;
+        }
+        newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+        if (logSize > getUpdateLogSizeWithoutOldValue()) {
+            // Prev Image exists
+            if (buffer.remaining() < Integer.BYTES) {
+                return RecordReadStatus.TRUNCATED;
+            }
+            oldValueSize = buffer.getInt();
+            if (buffer.remaining() < Integer.BYTES) {
+                return RecordReadStatus.TRUNCATED;
+            }
+            oldValueFieldCount = buffer.getInt();
+            if (buffer.remaining() < oldValueSize) {
+                return RecordReadStatus.TRUNCATED;
+            }
+            oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+        } else {
+            oldValueSize = 0;
+        }
         return RecordReadStatus.OK;
     }
 
+    private boolean readEntityInfo(ByteBuffer buffer) {
+        //attempt to read in the resourcePartition, dsid, PK hash and PK length
+        if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+            return false;
+        }
+        resourcePartition = buffer.getInt();
+        datasetId = buffer.getInt();
+        PKHashValue = buffer.getInt();
+        PKValueSize = buffer.getInt();
+        // attempt to read in the PK
+        if (buffer.remaining() < PKValueSize) {
+            return false;
+        }
+        if (PKValueSize <= 0) {
+            throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+        }
+        PKValue = readPKValue(buffer);
+        return true;
+    }
+
     @Override
     public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
@@ -345,7 +382,14 @@ public class LogRecord implements ILogRecord {
     }
 
     private void setUpdateLogSize() {
-        logSize = UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
+        logSize = getUpdateLogSizeWithoutOldValue();
+        if (oldValueSize > 0) {
+            logSize += /*size*/Integer.BYTES + /*fieldCount*/Integer.BYTES + /*tuple*/oldValueSize;
+        }
+    }
+
+    private int getUpdateLogSizeWithoutOldValue() {
+        return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
     }
 
     @Override
@@ -504,11 +548,6 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public int getNewValueSize() {
-        return newValueSize;
-    }
-
-    @Override
     public void setNewValueSize(int newValueSize) {
         this.newValueSize = newValueSize;
     }
@@ -521,7 +560,7 @@ public class LogRecord implements ILogRecord {
     @Override
     public void setNewValue(ITupleReference newValue) {
         this.newValue = newValue;
-        this.fieldCnt = newValue.getFieldCount();
+        this.newValueFieldCount = newValue.getFieldCount();
     }
 
     @Override
@@ -633,4 +672,20 @@ public class LogRecord implements ILogRecord {
     public boolean isReplicated() {
         return replicated;
     }
+
+    @Override
+    public ITupleReference getOldValue() {
+        return oldValue;
+    }
+
+    @Override
+    public void setOldValue(ITupleReference oldValue) {
+        this.oldValue = oldValue;
+        this.oldValueFieldCount = oldValue.getFieldCount();
+    }
+
+    @Override
+    public void setOldValueSize(int oldValueSize) {
+        this.oldValueSize = oldValueSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 3becd96..bc1c328 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -679,47 +679,50 @@ public class InputHandlerTest extends TestCase {
      */
     @org.junit.Test
     public void testMemoryVarSizeFrameWithSpillNoDiscard() {
-        try {
-            Random random = new Random();
-            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
-            // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
-            // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
-            writer.freeze();
-            // FramePool
-            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
-            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
-            handler.open();
-            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
-            int multiplier = 1;
-            // add NUM_FRAMES times
-            while ((multiplier <= framePool.remaining())) {
-                handler.nextFrame(buffer);
-                multiplier = random.nextInt(10) + 1;
-                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
-            }
-            // Next call should Not block. we will do it in a different thread
-            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
-            result.get();
-            // Check that no records were discarded
-            assertEquals(handler.getNumDiscarded(), 0);
-            // Check that one frame is spilled
-            assertEquals(handler.getNumSpilled(), 1);
-            // consume memory frames
-            while (!handler.getInternalBuffer().isEmpty()) {
-                writer.kick();
+        for (int k = 0; k < 1000; k++) {
+            try {
+                Random random = new Random();
+                IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+                // Spill budget = Memory budget, No discard
+                FeedPolicyAccessor fpa =
+                        createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+                // Non-Active Writer
+                TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
+                writer.freeze();
+                // FramePool
+                ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+                FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+                handler.open();
+                ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+                int multiplier = 1;
+                // add NUM_FRAMES times
+                while ((multiplier <= framePool.remaining())) {
+                    handler.nextFrame(buffer);
+                    multiplier = random.nextInt(10) + 1;
+                    buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+                }
+                // Next call should Not block. we will do it in a different thread
+                Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+                result.get();
+                // Check that no records were discarded
+                assertEquals(handler.getNumDiscarded(), 0);
+                // Check that one frame is spilled
+                assertEquals(handler.getNumSpilled(), 1);
+                int numOfBuffersInMemory = handler.getInternalBuffer().size();
+                // consume memory frames
+                while (numOfBuffersInMemory > 0) {
+                    writer.kick();
+                    numOfBuffersInMemory--;
+                }
+                // There should be 1 frame on disk
+                Assert.assertEquals(1, handler.framesOnDisk());
+                writer.unfreeze();
+                handler.close();
+                Assert.assertEquals(0, handler.framesOnDisk());
+            } catch (Throwable th) {
+                th.printStackTrace();
+                Assert.fail();
             }
-            // There should be 1 frame on disk
-            Assert.assertEquals(1, handler.framesOnDisk());
-            writer.unfreeze();
-            result.get();
-            handler.close();
-            Assert.assertEquals(0, handler.framesOnDisk());
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail();
         }
         Assert.assertNull(cause);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index b82c9ee..3b432a6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -315,7 +315,7 @@ public class MetadataNode implements IMetadataNode {
         // locks and secondary index doesn't.
         return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
                 metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
+                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp, false);
     }
 
     @Override
@@ -996,10 +996,8 @@ public class MetadataNode implements IMetadataNode {
             try {
                 while (rangeCursor.hasNext()) {
                     rangeCursor.next();
-                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
-                            new ISerializerDeserializer[] {
-                                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
-                                            BuiltinType.ASTRING),
+                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
                             AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
                             AqlSerializerDeserializerProvider.INSTANCE
                                     .getSerializerDeserializer(BuiltinType.ASTRING) }));
@@ -1016,7 +1014,7 @@ public class MetadataNode implements IMetadataNode {
 
     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
             IValueExtractor<ResultType> valueExtractor, List<ResultType> results)
-                    throws MetadataException, IndexException, IOException {
+            throws MetadataException, IndexException, IOException {
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
         String resourceName = index.getFile().toString();
         IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 28f8a79..323ea6d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -824,8 +824,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
             RTreeSearchOperatorDescriptor rtreeSearchOp;
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(
-                        comparatorFactories, primaryComparatorFactories);
+                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+                        getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
                 IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                         valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -1135,7 +1135,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
                     : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1614,8 +1614,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE);
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1819,7 +1819,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             ResourceType.LSM_INVERTED_INDEX)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX);
+                            ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1889,8 +1889,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             Pair<IAType, Boolean> keyPairType =
                     Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            boolean isPointMBR =
+                    spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -1969,14 +1969,14 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE);
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
 
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
-                    primaryComparatorFactories);
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+                    getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
             IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                     valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
                     new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2300,7 +2300,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
                     : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE);
+                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
@@ -2601,7 +2601,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             ResourceType.LSM_INVERTED_INDEX)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_INVERTED_INDEX);
+                            ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -2666,8 +2666,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
 
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            boolean isPointMBR =
+                    spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -2770,12 +2770,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_RTREE);
+                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
-                    primaryComparatorFactories);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+                    getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
             IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
                     valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
                     new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2922,7 +2922,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             ResourceType.LSM_BTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_BTREE);
+                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 65c9a49..2a3467e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -40,7 +40,6 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
     protected final byte resourceType;
     protected final IndexOperation indexOp;
     protected final ITransactionSubsystem txnSubsystem;
-    protected final SimpleTupleWriter tupleWriter;
     protected final ILogRecord logRecord;
 
     protected AbstractIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
@@ -51,7 +50,6 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         this.resourceType = resourceType;
         this.indexOp = indexOp;
         this.txnSubsystem = txnSubsystem;
-        tupleWriter = new SimpleTupleWriter();
         logRecord = new LogRecord();
         logRecord.setTxnCtx(txnCtx);
         logRecord.setLogType(LogType.UPDATE);
@@ -62,17 +60,23 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         logRecord.setNewOp((byte) (indexOp.ordinal()));
     }
 
-    protected void log(int PKHash, ITupleReference newValue) throws ACIDException {
+    protected void log(int PKHash, ITupleReference newValue, ITupleReference oldValue) throws ACIDException {
         logRecord.setPKHashValue(PKHash);
         logRecord.setPKFields(primaryKeyFields);
         logRecord.setPKValue(newValue);
         logRecord.computeAndSetPKValueSize();
         if (newValue != null) {
-            logRecord.setNewValueSize(tupleWriter.bytesRequired(newValue));
+            logRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue));
             logRecord.setNewValue(newValue);
         } else {
             logRecord.setNewValueSize(0);
         }
+        if (oldValue != null) {
+            logRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue));
+            logRecord.setOldValue(oldValue);
+        } else {
+            logRecord.setOldValueSize(0);
+        }
         logRecord.computeAndSetLogSize();
         txnSubsystem.getLogManager().log(logRecord);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 4bde490..5b89bb5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -40,13 +40,16 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi
         implements IModificationOperationCallback {
 
     private final AsterixLSMInsertDeleteOperatorNodePushable operatorNodePushable;
+    private final boolean logBeforeImage;
 
     public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable) {
+            byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable,
+            boolean logBeforeImage) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
         this.operatorNodePushable = (AsterixLSMInsertDeleteOperatorNodePushable) operatorNodePushable;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -99,7 +102,11 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            log(pkHash, after);
+            if (logBeforeImage) {
+                log(pkHash, after, before);
+            } else {
+                log(pkHash, after, null);
+            }
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index c406812..52e2818 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -43,11 +43,14 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
+    private final boolean logBeforeImage;
 
     public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
+            boolean logBeforeImage) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -56,8 +59,8 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IIndexLifecycleManager indexLifeCycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
@@ -67,7 +70,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
-                    resourcePartition, resourceType, indexOp, operatorNodePushable);
+                    resourcePartition, resourceType, indexOp, operatorNodePushable, logBeforeImage);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 8044d90..974e631 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -37,13 +37,15 @@ public class SecondaryIndexModificationOperationCallback extends AbstractIndexMo
         implements IModificationOperationCallback {
 
     protected final IndexOperation oldOp;
+    private final boolean logBeforeImage;
 
     public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
-            int resourcePartition, byte resourceType, IndexOperation indexOp) {
+            int resourcePartition, byte resourceType, IndexOperation indexOp, boolean logBeforeImage) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
         oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -55,7 +57,7 @@ public class SecondaryIndexModificationOperationCallback extends AbstractIndexMo
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            this.log(pkHash, after);
+            this.log(pkHash, after, logBeforeImage ? before : null);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 168da99..c6743dd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -40,11 +40,14 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
+    private final boolean logBeforeImage;
 
     public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
+            boolean logBeforeImage) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -63,7 +66,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
-                    resourcePartition, resourceType, indexOp);
+                    resourcePartition, resourceType, indexOp, logBeforeImage);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index f98083a..13d2d57 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -29,12 +29,14 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 
 public class UpsertOperationCallback extends AbstractIndexModificationOperationCallback
         implements IModificationOperationCallback {
+    private final boolean logBeforeImage;
 
     public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp) {
+            byte resourceType, IndexOperation indexOp, boolean logBeforeImage) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -46,7 +48,7 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationC
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            log(pkHash, after);
+            log(pkHash, after, logBeforeImage ? before : null);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 87cb8e7..5bf1505 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -39,11 +39,14 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
+    private final boolean logBeforeImage;
 
     public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
+            boolean logBeforeImage) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
+        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -52,8 +55,8 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IIndexLifecycleManager indexLifeCycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
@@ -61,9 +64,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
 
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId, primaryKeyFields,
-                    txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourcePartition, resourceType,
-                    indexOp);
+            IModificationOperationCallback modCallback =
+                    new UpsertOperationCallback(datasetId, primaryKeyFields, txnCtx, txnSubsystem.getLockManager(),
+                            txnSubsystem, resourceId, resourcePartition, resourceType, indexOp, logBeforeImage);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/asterixdb/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index f474690..7f5c612 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -199,7 +199,7 @@
                   <pluginExecutionFilter>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-plugin-plugin</artifactId>
-                    <versionRange>[3.4,)</versionRange>
+                    <versionRange>[3.3,)</versionRange>
                     <goals>
                       <goal>descriptor</goal>
                     </goals>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
index 11b0010..7aaa983 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
@@ -22,11 +22,18 @@ package org.apache.hyracks.storage.am.common.tuples;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 
+/*
+ * This class should be replaced by a Util class
+ */
 public class SimpleTupleWriter implements ITreeIndexTupleWriter {
 
+    public static final SimpleTupleWriter INSTANCE = new SimpleTupleWriter();
+
+    private SimpleTupleWriter() {
+    }
+
     // Write short in little endian to target byte array at given offset.
     private static void writeShortL(short s, byte[] buf, int targetOff) {
         buf[targetOff] = (byte) (s >> 8);
@@ -52,7 +59,7 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter {
     }
 
     @Override
-    public ITreeIndexTupleReference createTupleReference() {
+    public SimpleTupleReference createTupleReference() {
         return new SimpleTupleReference();
     }
 
@@ -103,7 +110,7 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter {
     }
 
     protected int getNullFlagsBytes(ITupleReference tuple) {
-        return (int) Math.ceil((double) tuple.getFieldCount() / 8.0);
+        return (int) Math.ceil(tuple.getFieldCount() / 8.0);
     }
 
     protected int getFieldSlotsBytes(ITupleReference tuple) {
@@ -111,7 +118,7 @@ public class SimpleTupleWriter implements ITreeIndexTupleWriter {
     }
 
     protected int getNullFlagsBytes(ITupleReference tuple, int startField, int numFields) {
-        return (int) Math.ceil((double) numFields / 8.0);
+        return (int) Math.ceil(numFields / 8.0);
     }
 
     protected int getFieldSlotsBytes(ITupleReference tuple, int startField, int numFields) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
deleted file mode 100644
index be0688a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.common.tuples;
-
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
-
-public class SimpleTupleWriterFactory implements ITreeIndexTupleWriterFactory {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public ITreeIndexTupleWriter createTupleWriter() {
-		return new SimpleTupleWriter();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/df088705/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b58cc29..8ddab88 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -511,8 +511,8 @@ public class LSMHarness implements ILSMHarness {
 
     protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
             throws HyracksDataException {
-        ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                NoOpOperationCallback.INSTANCE);
+        ILSMIndexAccessorInternal accessor =
+                lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         accessor.scheduleReplication(lsmComponents, bulkload, opType);
     }
 


Mime
View raw message