asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/4] asterixdb git commit: Add Test NodeController, Test Data Generator, and Marker Logs
Date Fri, 22 Jul 2016 19:37:02 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
new file mode 100644
index 0000000..b676dfd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TestTupleReference implements ITupleReference {
+    private GrowableArray[] fields;
+    private int[] offsets;
+
+    public TestTupleReference(GrowableArray[] fields) {
+        this.fields = fields;
+        offsets = new int[fields.length];
+    }
+
+    public TestTupleReference(int numfields) {
+        this.fields = new GrowableArray[numfields];
+        for (int i = 0; i < numfields; i++) {
+            fields[i] = new GrowableArray();
+        }
+        offsets = new int[fields.length];
+    }
+
+    public GrowableArray[] getFields() {
+        return fields;
+    }
+
+    public void setFields(GrowableArray[] fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fields.length;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fields[fIdx].getByteArray();
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return offsets[fIdx];
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return fields[fIdx].getLength();
+    }
+
+    public void reset() {
+        for (GrowableArray field : fields) {
+            field.reset();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index 2f712cc..a253ac0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -44,6 +45,7 @@ import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeser
 import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
 import org.apache.hyracks.test.support.TestUtils;
@@ -70,7 +72,7 @@ public class ConnectorDescriptorWithMessagingTest {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
@@ -144,8 +146,8 @@ public class ConnectorDescriptorWithMessagingTest {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
-            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE + 1);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -165,7 +167,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);
                 FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
                         MessagingFrameTupleAppender.getMessageType(tempBuffer));
             }
             message.getBuffer().clear();
@@ -228,9 +230,9 @@ public class ConnectorDescriptorWithMessagingTest {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
-            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+            writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -264,7 +266,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);
                 FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
                         MessagingFrameTupleAppender.getMessageType(tempBuffer));
             }
             partitioner.close();
@@ -286,7 +288,7 @@ public class ConnectorDescriptorWithMessagingTest {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
             message.getBuffer().flip();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
new file mode 100644
index 0000000..a0ef31e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogMarkerTest {
+
+    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { 0 };
+    private static final int NUM_OF_RECORDS = 100000;
+    private static final int SNAPSHOT_SIZE = 1000;
+    private static final int DATASET_ID = 101;
+    private static final String SPILL_AREA = "target" + File.separator + "spill_area";
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        System.out.println("SetUp: ");
+        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.out.println("TearDown");
+        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+    }
+
+    @Test
+    public void testInsertWithSnapshot() {
+        try {
+            TestNodeController nc = new TestNodeController();
+            nc.init();
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+                    NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            Collections.emptyList(), null, null, null, false, null, false),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
+            try {
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+                        null);
+                IHyracksTaskContext ctx = nc.createTestContext();
+                nc.newJobId();
+                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                insertOp.open();
+                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                VSizeFrame frame = new VSizeFrame(ctx);
+                VSizeFrame marker = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+                long markerId = 0L;
+                for (int j = 0; j < NUM_OF_RECORDS; j++) {
+                    if (j % SNAPSHOT_SIZE == 0) {
+                        marker.reset();
+                        marker.getBuffer().put(MessagingFrameTupleAppender.MARKER_MESSAGE);
+                        marker.getBuffer().putLong(markerId);
+                        marker.getBuffer().flip();
+                        markerId++;
+                        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, marker, ctx);
+                        tupleAppender.flush(insertOp);
+                    }
+                    ITupleReference tuple = tupleGenerator.next();
+                    DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                }
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+                insertOp.close();
+                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+                LSMBTreeDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                dataflowHelper.open();
+                LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
+                long lsn = btree.getMostRecentMarkerLSN();
+                int numOfMarkers = 0;
+                LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
+                long expectedMarkerId = markerId - 1;
+                while (lsn >= 0) {
+                    numOfMarkers++;
+                    ILogRecord logRecord = logReader.read(lsn);
+                    lsn = logRecord.getPreviousMarkerLSN();
+                    long logMarkerId = logRecord.getMarker().getLong();
+                    Assert.assertEquals(expectedMarkerId, logMarkerId);
+                    expectedMarkerId--;
+                }
+                logReader.close();
+                dataflowHelper.close();
+                Assert.assertEquals(markerId, numOfMarkers);
+                nc.newJobId();
+                TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                        Collections.emptyList(), Collections.emptyList(), false);
+                IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
+                        META_TYPE, new NoMergePolicyFactory(), null, null);
+                emptyTupleOp.open();
+                emptyTupleOp.close();
+                Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());
+            } finally {
+                nc.deInit();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+
+    }
+
+    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+                exceptionThrowingOperations, errorThrowingOperations);
+        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+                closeAnswer, deepCopyInputFrames);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
deleted file mode 100644
index 536bf3a..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.test.dataflow;
-
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class TestRecordDescriptorFactory {
-    public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index fc1c221..f6c0c99 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -252,6 +252,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b3eb281..71c30d5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -79,7 +79,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
     @Override
     public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
             ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             decrementNumActiveOperations(modificationCallback);
             if (numActiveOperations.get() == 0) {
@@ -148,12 +148,12 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
 
             //get resource
-            ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndexAccessor accessor =
+                    lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             //update resource lsn
-            AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
-                    .getIOOperationCallback();
+            AbstractLSMIOOperationCallback ioOpCallback =
+                    (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
             ioOpCallback.updateLastLSN(logRecord.getLSN());
 
             //schedule flush after update

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 9a76b40..cf66d30 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.FrameDataException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -31,6 +33,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -41,7 +44,9 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
 public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
+    public static final String KEY_INDEX = "Index";
     private final boolean isPrimary;
+    // This class has both lsmIndex and index (in super class) pointing to the same object
     private AbstractLSMIndex lsmIndex;
     private int i = 0;
 
@@ -59,10 +64,6 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
     private int currentTupleIdx;
     private int lastFlushedTupleIdx;
 
-    public boolean isPrimary() {
-        return isPrimary;
-    }
-
     public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
             boolean isPrimary) {
@@ -79,6 +80,10 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
         indexHelper.open();
         lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
+            if (isPrimary && ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
+                TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
@@ -185,4 +190,8 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
             writer.fail();
         }
     }
+
+    public boolean isPrimary() {
+        return isPrimary;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
new file mode 100644
index 0000000..11d649b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provide callback mechanism for adding marker logs to the transaction log file
+ */
+public interface ILogMarkerCallback {
+
+    String KEY_MARKER_CALLBACK = "MARKER_CALLBACK";
+
+    /**
+     * Called before writing the marker log allowing addition of specific information to the log record
+     *
+     * @param buffer:
+     *            the log buffer to write to
+     */
+    void before(ByteBuffer buffer);
+
+    /**
+     * Called after the log's been appended to the log tail passing the position of the log used for random access
+     *
+     * @param lsn
+     */
+    void after(long lsn);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index cd05ba0..29af931 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -32,11 +32,38 @@ public interface ILogRecord {
         LARGE_RECORD
     }
 
-    public static final int JOB_TERMINATE_LOG_SIZE = 14; //JOB_COMMIT or ABORT log type
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
-    public static final int UPDATE_LOG_BASE_SIZE = 51;
-    public static final int FLUSH_LOG_SIZE = 18;
-    public static final int WAIT_LOG_SIZE = 14;
+    public static final int CHKSUM_LEN = Long.BYTES;
+    public static final int FLDCNT_LEN = Integer.BYTES;
+    public static final int DS_LEN = Integer.BYTES;
+    public static final int LOG_SOURCE_LEN = Byte.BYTES;
+    public static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    public static final int NEWOP_LEN = Byte.BYTES;
+    public static final int NEWVALSZ_LEN = Integer.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    public static final int PRVLSN_LEN = Long.BYTES;
+    public static final int RS_PARTITION_LEN = Integer.BYTES;
+    public static final int RSID_LEN = Long.BYTES;
+    public static final int SEQ_NUM_LEN = Long.BYTES;
+    public static final int TYPE_LEN = Byte.BYTES;
+    public static final int UUID_LEN = Long.BYTES;
+    public static final int VBUCKET_ID_LEN = Short.BYTES;
+
+    public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+    public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+    // What are these fields? vvvvv
+    public static final int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
+
+    // How are the following computed?
+    public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int UPDATE_LOG_BASE_SIZE = 51; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int FLUSH_LOG_SIZE = 18; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    public static final int MARKER_BASE_LOG_SIZE =
+            ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
 
     public RecordReadStatus readLogRecord(ByteBuffer buffer);
 
@@ -135,7 +162,15 @@ public interface ILogRecord {
 
     public ITupleReference getOldValue();
 
-    public void setOldValue(ITupleReference oldValue);
+    public void setOldValue(ITupleReference tupleBefore);
 
-    public void setOldValueSize(int oldValueSize);
+    public void setOldValueSize(int beforeSize);
+
+    public boolean isMarker();
+
+    public ByteBuffer getMarker();
+
+    public void logAppended(long lsn);
+
+    public long getPreviousMarkerLSN();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 23fdd0f..306b888 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -71,24 +71,6 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
 public class LogRecord implements ILogRecord {
 
-    private static final int LOG_SOURCE_LEN = Byte.BYTES;
-    private static final int TYPE_LEN = Byte.BYTES;
-    public static final int PKHASH_LEN = Integer.BYTES;
-    public static final int PKSZ_LEN = Integer.BYTES;
-    private static final int RS_PARTITION_LEN = Integer.BYTES;
-    private static final int RSID_LEN = Long.BYTES;
-    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
-    private static final int FLDCNT_LEN = Integer.BYTES;
-    private static final int NEWOP_LEN = Byte.BYTES;
-    private static final int NEWVALSZ_LEN = Integer.BYTES;
-    private static final int CHKSUM_LEN = Long.BYTES;
-
-    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
     private byte logType;
@@ -108,8 +90,10 @@ public class LogRecord implements ILogRecord {
     private ITupleReference oldValue;
     private int oldValueFieldCount;
     private long checksum;
+    private long prevMarkerLSN;
+    private ByteBuffer marker;
     // ------------- fields in a log record (end) --------------//
-
+    private final ILogMarkerCallback callback; // A callback for log mark operations
     private int PKFieldCnt;
     private ITransactionContext txnCtx;
     private long LSN;
@@ -129,7 +113,8 @@ public class LogRecord implements ILogRecord {
     private String nodeId;
     private boolean replicated = false;
 
-    public LogRecord() {
+    public LogRecord(ILogMarkerCallback callback) {
+        this.callback = callback;
         isFlushed = new AtomicBoolean(false);
         readPKValue = new PrimaryKeyTupleReference();
         readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
@@ -138,49 +123,70 @@ public class LogRecord implements ILogRecord {
         logSource = LogSource.LOCAL;
     }
 
-    private void writeLogRecordCommonFields(ByteBuffer buffer) {
+    public LogRecord() {
+        this(null);
+    }
+
+    private void doWriteLogRecord(ByteBuffer buffer) {
         buffer.put(logSource);
         buffer.put(logType);
         buffer.putInt(jobId);
-        if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT) {
-            buffer.putInt(resourcePartition);
-            buffer.putInt(datasetId);
-            buffer.putInt(PKHashValue);
-            if (PKValueSize <= 0) {
-                throw new IllegalStateException("Primary Key Size is less than or equal to 0");
-            }
-            buffer.putInt(PKValueSize);
-            writePKValue(buffer);
-        }
-        if (logType == LogType.UPDATE) {
-            buffer.putLong(resourceId);
-            buffer.putInt(logSize);
-            buffer.putInt(newValueFieldCount);
-            buffer.put(newOp);
-            buffer.putInt(newValueSize);
-            writeTuple(buffer, newValue, newValueSize);
-            if (oldValueSize > 0) {
-                buffer.putInt(oldValueSize);
-                buffer.putInt(oldValueFieldCount);
-                writeTuple(buffer, oldValue, oldValueSize);
-            }
+        switch (logType) {
+            case LogType.ENTITY_COMMIT:
+            case LogType.UPSERT_ENTITY_COMMIT:
+                writeEntityInfo(buffer);
+                break;
+            case LogType.UPDATE:
+                writeEntityInfo(buffer);
+                buffer.putLong(resourceId);
+                buffer.putInt(logSize);
+                buffer.putInt(newValueFieldCount);
+                buffer.put(newOp);
+                buffer.putInt(newValueSize);
+                writeTuple(buffer, newValue, newValueSize);
+                if (oldValueSize > 0) {
+                    buffer.putInt(oldValueSize);
+                    buffer.putInt(oldValueFieldCount);
+                    writeTuple(buffer, oldValue, oldValueSize);
+                }
+                break;
+            case LogType.FLUSH:
+                buffer.putInt(datasetId);
+                break;
+            case LogType.MARKER:
+                buffer.putInt(datasetId);
+                buffer.putInt(resourcePartition);
+                callback.before(buffer);
+                buffer.putInt(logSize);
+                buffer.put(marker);
+                break;
+            default:
+                // Do nothing
         }
-        if (logType == LogType.FLUSH) {
-            buffer.putInt(datasetId);
+    }
+
+    private void writeEntityInfo(ByteBuffer buffer) {
+        buffer.putInt(resourcePartition);
+        buffer.putInt(datasetId);
+        buffer.putInt(PKHashValue);
+        if (PKValueSize <= 0) {
+            throw new IllegalStateException("Primary Key Size is less than or equal to 0");
         }
+        buffer.putInt(PKValueSize);
+        writePKValue(buffer);
     }
 
     @Override
     public void writeLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
-        writeLogRecordCommonFields(buffer);
+        doWriteLogRecord(buffer);
         checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
         buffer.putLong(checksum);
     }
 
     @Override
     public void writeRemoteLogRecord(ByteBuffer buffer) {
-        writeLogRecordCommonFields(buffer);
+        doWriteLogRecord(buffer);
         if (logType == LogType.FLUSH) {
             buffer.putLong(LSN);
             buffer.putInt(numOfFlushedIndexes);
@@ -222,7 +228,7 @@ public class LogRecord implements ILogRecord {
         int beginOffset = buffer.position();
 
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
+        RecordReadStatus status = doReadLogRecord(buffer);
         if (status != RecordReadStatus.OK) {
             buffer.position(beginOffset);
             return status;
@@ -241,7 +247,7 @@ public class LogRecord implements ILogRecord {
         return RecordReadStatus.OK;
     }
 
-    private RecordReadStatus readLogCommonFields(ByteBuffer buffer) {
+    private RecordReadStatus doReadLogRecord(ByteBuffer buffer) {
         //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
         if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
             return RecordReadStatus.TRUNCATED;
@@ -255,64 +261,88 @@ public class LogRecord implements ILogRecord {
                     return RecordReadStatus.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
-                resourceId = 0L;
+                resourceId = 0l;
+                // fall throuh
+            case LogType.WAIT:
+                computeAndSetLogSize();
                 break;
-            case LogType.ABORT:
             case LogType.JOB_COMMIT:
+            case LogType.ABORT:
                 datasetId = -1;
                 PKHashValue = -1;
+                computeAndSetLogSize();
                 break;
             case LogType.ENTITY_COMMIT:
             case LogType.UPSERT_ENTITY_COMMIT:
-                if (!readEntityInfo(buffer)) {
+                if (readEntityInfo(buffer)) {
+                    computeAndSetLogSize();
+                } else {
                     return RecordReadStatus.TRUNCATED;
                 }
                 break;
             case LogType.UPDATE:
-                if (!readEntityInfo(buffer)) {
+                if (readEntityInfo(buffer)) {
+                    if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+                        return RecordReadStatus.TRUNCATED;
+                    }
+                    resourceId = buffer.getLong();
+                    logSize = buffer.getInt();
+                    newValueFieldCount = buffer.getInt();
+                    newOp = buffer.get();
+                    newValueSize = buffer.getInt();
+                    if (buffer.remaining() < newValueSize) {
+                        if (logSize > buffer.capacity()) {
+                            return RecordReadStatus.LARGE_RECORD;
+                        }
+                        return RecordReadStatus.TRUNCATED;
+                    }
+                    newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+                    if (logSize > getUpdateLogSizeWithoutOldValue()) {
+                        // Prev Image exists
+                        if (buffer.remaining() < Integer.BYTES) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValueSize = buffer.getInt();
+                        if (buffer.remaining() < Integer.BYTES) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValueFieldCount = buffer.getInt();
+                        if (buffer.remaining() < oldValueSize) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+                    } else {
+                        oldValueSize = 0;
+                    }
+                } else {
                     return RecordReadStatus.TRUNCATED;
                 }
-                if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+                break;
+            case LogType.MARKER:
+                if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                resourceId = buffer.getLong();
+                datasetId = buffer.getInt();
+                resourcePartition = buffer.getInt();
+                prevMarkerLSN = buffer.getLong();
                 logSize = buffer.getInt();
-                newValueFieldCount = buffer.getInt();
-                newOp = buffer.get();
-                newValueSize = buffer.getInt();
-                return readEntity(buffer);
+                int lenRemaining = logSize - MARKER_BASE_LOG_SIZE;
+                if (buffer.remaining() < lenRemaining) {
+                    return RecordReadStatus.TRUNCATED;
+                }
+
+                if (marker == null || marker.capacity() < lenRemaining) {
+                    // TODO(amoudi): account for memory allocation
+                    marker = ByteBuffer.allocate(lenRemaining + Short.BYTES);
+                }
+                marker.clear();
+                buffer.get(marker.array(), 0, lenRemaining);
+                marker.position(lenRemaining);
+                marker.flip();
+                break;
             default:
                 break;
         }
-        computeAndSetLogSize();
-        return RecordReadStatus.OK;
-    }
-
-    private RecordReadStatus readEntity(ByteBuffer buffer) {
-        if (buffer.remaining() < newValueSize) {
-            if (logSize > buffer.capacity()) {
-                return RecordReadStatus.LARGE_RECORD;
-            }
-            return RecordReadStatus.TRUNCATED;
-        }
-        newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
-        if (logSize > getUpdateLogSizeWithoutOldValue()) {
-            // Prev Image exists
-            if (buffer.remaining() < Integer.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValueSize = buffer.getInt();
-            if (buffer.remaining() < Integer.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValueFieldCount = buffer.getInt();
-            if (buffer.remaining() < oldValueSize) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
-        } else {
-            oldValueSize = 0;
-        }
         return RecordReadStatus.OK;
     }
 
@@ -339,7 +369,7 @@ public class LogRecord implements ILogRecord {
     @Override
     public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        readLogCommonFields(buffer);
+        doReadLogRecord(buffer);
 
         if (logType == LogType.FLUSH) {
             LSN = buffer.getLong();
@@ -412,11 +442,18 @@ public class LogRecord implements ILogRecord {
             case LogType.WAIT:
                 logSize = WAIT_LOG_SIZE;
                 break;
+            case LogType.MARKER:
+                setMarkerLogSize();
+                break;
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
     }
 
+    private void setMarkerLogSize() {
+        logSize = MARKER_BASE_LOG_SIZE + marker.remaining();
+    }
+
     @Override
     public String getLogRecordForDisplay() {
         StringBuilder builder = new StringBuilder();
@@ -688,4 +725,28 @@ public class LogRecord implements ILogRecord {
     public void setOldValueSize(int oldValueSize) {
         this.oldValueSize = oldValueSize;
     }
+
+    public void setMarker(ByteBuffer marker) {
+        this.marker = marker;
+    }
+
+    @Override
+    public boolean isMarker() {
+        return logType == LogType.MARKER;
+    }
+
+    @Override
+    public void logAppended(long lsn) {
+        callback.after(lsn);
+    }
+
+    @Override
+    public long getPreviousMarkerLSN() {
+        return prevMarkerLSN;
+    }
+
+    @Override
+    public ByteBuffer getMarker() {
+        return marker;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 714b8f7..269e4b9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -27,6 +27,7 @@ public class LogType {
     public static final byte FLUSH = 4;
     public static final byte UPSERT_ENTITY_COMMIT = 5;
     public static final byte WAIT = 6;
+    public static final byte MARKER = 7;
 
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -35,8 +36,8 @@ public class LogType {
     private static final String STRING_FLUSH = "FLUSH";
     private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
     private static final String STRING_WAIT = "WAIT";
-
-    private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+    private static final String STRING_MARKER = "MARKER";
+    private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
 
     public static String toString(byte logType) {
         switch (logType) {
@@ -54,8 +55,10 @@ public class LogType {
                 return STRING_UPSERT_ENTITY_COMMIT;
             case LogType.WAIT:
                 return STRING_WAIT;
+            case LogType.MARKER:
+                return STRING_MARKER;
             default:
-                return STRING_INVALID_LOG_TYPE;
+                return STRING_UNKNOWN_LOG_TYPE;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
new file mode 100644
index 0000000..7dae65f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+/**
+ * A basic callback used to write marker to transaction logs
+ */
+public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
+
+    private AbstractLSMIndex index;
+
+    /**
+     * @param index:
+     *            a pointer to the primary index used to store marker log info
+     * @throws HyracksDataException
+     */
+    public PrimaryIndexLogMarkerCallback(AbstractLSMIndex index) throws HyracksDataException {
+        this.index = index;
+    }
+
+    @Override
+    public void before(ByteBuffer buffer) {
+        buffer.putLong(index.getCurrentMemoryComponent().getMostRecentMarkerLSN());
+    }
+
+    @Override
+    public void after(long lsn) {
+        index.getCurrentMemoryComponent().setMostRecentMarkerLSN(lsn);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
new file mode 100644
index 0000000..4bca216
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+/**
+ * A static class that stores asterix constants
+ */
+public class AsterixConstants {
+    public static final String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
+
+    private AsterixConstants() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
new file mode 100644
index 0000000..8c83687
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Not thread safe stack that is used to store fixed size buffers in memory
+ * Once memory is consumed, it uses disk to store buffers
+ */
+public class FrameStack implements Closeable {
+    private static final AtomicInteger stackIdGenerator = new AtomicInteger(0);
+    private static final String STACK_FILE_NAME = "stack";
+    private final int stackId;
+    private final int frameSize;
+    private final int numOfMemoryFrames;
+    private final ArrayDeque<ByteBuffer> fullBuffers;
+    private final ArrayDeque<ByteBuffer> emptyBuffers;
+    private int totalWriteCount = 0;
+    private int totalReadCount = 0;
+    private final File file;
+    private final RandomAccessFile iostream;
+    private final byte[] frame;
+
+    /**
+     * Create a hybrid of memory and disk stack of byte buffers
+     *
+     * @param dir
+     * @param frameSize
+     * @param numOfMemoryFrames
+     * @throws HyracksDataException
+     * @throws FileNotFoundException
+     */
+    public FrameStack(String dir, int frameSize, int numOfMemoryFrames)
+            throws HyracksDataException, FileNotFoundException {
+        this.stackId = stackIdGenerator.getAndIncrement();
+        this.frameSize = frameSize;
+        this.numOfMemoryFrames = numOfMemoryFrames;
+        this.fullBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+        this.emptyBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+        this.file = StoragePathUtil.createFile(
+                ((dir == null) ? "" : (dir.endsWith(File.separator) ? dir : (dir + File.separator))) + STACK_FILE_NAME,
+                stackId);
+        this.iostream = new RandomAccessFile(file, "rw");
+        this.frame = new byte[frameSize];
+    }
+
+    /**
+     * @return the number of remaining frames to be read in the stack
+     */
+    public int remaining() {
+        return totalWriteCount - totalReadCount;
+    }
+
+    /**
+     * copy content of buffer into the stack
+     *
+     * @param buffer
+     * @throws IOException
+     */
+    public synchronized void push(ByteBuffer buffer) throws IOException {
+        int diff = totalWriteCount - totalReadCount;
+        if (diff < numOfMemoryFrames) {
+            ByteBuffer aBuffer = allocate();
+            aBuffer.put(buffer.array());
+            aBuffer.flip();
+            fullBuffers.push(aBuffer);
+        } else {
+            long position = (long) (diff - numOfMemoryFrames) * frameSize;
+            if (position != iostream.getFilePointer()) {
+                iostream.seek(position);
+            }
+            iostream.write(buffer.array());
+        }
+        totalWriteCount++;
+    }
+
+    private ByteBuffer allocate() {
+        ByteBuffer aBuffer = emptyBuffers.poll();
+        if (aBuffer == null) {
+            aBuffer = ByteBuffer.allocate(frameSize);
+        }
+        aBuffer.clear();
+        return aBuffer;
+    }
+
+    /**
+     * Free a frame off of the stack and copy it into dest
+     *
+     * @param dest
+     * @throws IOException
+     */
+    public synchronized void pop(ByteBuffer dest) throws IOException {
+        dest.clear();
+        int diff = totalWriteCount - totalReadCount - 1;
+        if (diff >= 0) {
+            if (diff < numOfMemoryFrames) {
+                totalReadCount++;
+                ByteBuffer aBuffer = fullBuffers.pop();
+                emptyBuffers.push(aBuffer);
+                dest.put(aBuffer.array());
+            } else {
+                long position = (long) (diff - numOfMemoryFrames) * frameSize;
+                iostream.seek(position);
+                iostream.readFully(frame);
+                dest.put(frame);
+            }
+        }
+        dest.flip();
+    }
+
+    /**
+     * Closing this stack will result in the data being deleted
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        iostream.close();
+        Files.delete(file.toPath());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 78b06fb..615e8af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -24,12 +24,16 @@ import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
 public class StoragePathUtil {
+    private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
     public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
     public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
@@ -70,4 +74,39 @@ public class StoragePathUtil {
     public static int getPartitionNumFromName(String name) {
         return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
     }
+
+    /**
+     * Create a file
+     * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
+     * creating files simultaneously
+     *
+     * @param name
+     * @param count
+     * @return
+     * @throws HyracksDataException
+     */
+    public static File createFile(String name, int count) throws HyracksDataException {
+        try {
+            String fileName = name + "_" + count;
+            File file = new File(fileName);
+            if (file.getParentFile() != null) {
+                file.getParentFile().mkdirs();
+            }
+            if (!file.exists()) {
+                boolean success = file.createNewFile();
+                if (!success) {
+                    throw new HyracksDataException("Unable to create spill file " + fileName);
+                } else {
+                    if (LOGGER.isEnabledFor(Level.INFO)) {
+                        LOGGER.info("Created spill file " + file.getAbsolutePath());
+                    }
+                }
+            } else {
+                throw new HyracksDataException("spill file " + fileName + " already exists");
+            }
+            return file;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 1d5b15e..2878d5a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.common.utils;
 
+import java.nio.ByteBuffer;
+
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -64,4 +67,17 @@ public class TransactionUtil {
         logRecord.computeAndSetPKValueSize();
         logRecord.computeAndSetLogSize();
     }
+
+    public static void formMarkerLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
+            int resourcePartition, ByteBuffer marker) {
+        logRecord.setTxnCtx(txnCtx);
+        logRecord.setLogSource(LogSource.LOCAL);
+        logRecord.setLogType(LogType.MARKER);
+        logRecord.setJobId(txnCtx.getJobId().getId());
+        logRecord.setDatasetId(datasetId);
+        logRecord.setResourcePartition(resourcePartition);
+        marker.get(); // read the first byte since it is not part of the marker object
+        logRecord.setMarker(marker);
+        logRecord.computeAndSetLogSize();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
deleted file mode 100644
index 11a2510..0000000
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.event.util;
-
-public class AsterixConstants {
-
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b19a722..1780a51 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.utils.AsterixConstants;
 import org.apache.asterix.event.driver.EventDriver;
 import org.apache.asterix.event.error.VerificationUtil;
 import org.apache.asterix.event.model.AsterixInstance;
@@ -72,8 +73,8 @@ public class PatternCreator {
 
         for (Node node : cluster.getNode()) {
             if (copyHyracksToNC) {
-                Pattern copyHyracksForNC = createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(),
-                        destDir);
+                Pattern copyHyracksForNC =
+                        createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(), destDir);
                 ps.add(copyHyracksForNC);
             }
         }
@@ -389,8 +390,8 @@ public class PatternCreator {
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
         String workingDir = cluster.getWorkingDir().getDir();
-        String destDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
-                + libraryName;
+        String destDir =
+                workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
         String fileToTransfer = new File(libraryPath).getAbsolutePath();
 
         Iterator<Node> installTargets = cluster.getNode().iterator();
@@ -434,8 +435,8 @@ public class PatternCreator {
         patternList.add(p);
 
         Iterator<Node> uninstallTargets = cluster.getNode().iterator();
-        String libDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
-                + libraryName;
+        String libDir =
+                workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
         Node uninstallNode = uninstallTargets.next();
         nodeid = new Nodeid(new Value(null, uninstallNode.getId()));
         event = new Event("file_delete", nodeid, libDir);
@@ -606,8 +607,8 @@ public class PatternCreator {
         String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
         String srcHost = cluster.getMasterNode().getClientIp();
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
-        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
-                : cluster.getMasterNode().getLogDir();
+        String srcDir =
+                cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode().getLogDir();
         String destDir = outputDir + File.separator + "cc";
         String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
         Event event = new Event("directory_copy", nodeid, pargs);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
new file mode 100644
index 0000000..487b47d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.hyracks.api.comm.VSizeFrame;
+
+public interface IFeedMarker {
+
+    /**
+     * Mark the frame with a mark denoting the progress of the feed
+     * The mark will be eventually written to the transaction log
+     *
+     * @param mark
+     *            a frame to write the progress mark in
+     * @return
+     */
+    public boolean mark(VSizeFrame mark);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index 0f5ada4..9d9ff28 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.api;
 
 import java.io.IOException;
 
+@FunctionalInterface
 public interface IRecordConverter<I, O> {
 
     public O convert(IRawRecord<? extends I> input) throws IOException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 9cce1c9..08ffe18 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -73,4 +73,8 @@ public interface IRecordReader<T> extends Closeable {
      * gives the record reader a chance to recover from IO errors during feed intake
      */
     public boolean handleException(Throwable th);
+
+    public default IFeedMarker getProgressReporter() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index a301ac9..7806489 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -34,9 +34,9 @@ public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowControlle
 
     public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
             throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
         this.dataParser = dataParser;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index b47d278..7d65c52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -32,9 +32,9 @@ public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlo
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
-                    throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
+            throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 87daffa..be9056b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,10 +19,15 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 
+import org.apache.asterix.external.api.IFeedMarker;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
@@ -30,9 +35,13 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -40,38 +49,52 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     protected final IRecordDataParser<T> dataParser;
     protected final IRecordReader<? extends T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
-    protected final long interval = 1000;
+    protected static final long INTERVAL = 1000;
+    protected final Object mutex = new Object();
+    protected final boolean sendMarker;
     protected boolean failed = false;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
+            @Nonnull IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
+        this.sendMarker = sendMarker;
         recordReader.setFeedLogManager(feedLogManager);
         recordReader.setController(this);
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
+        ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
+        Future<?> result = null;
+        if (sendMarker) {
+            DataflowMarker dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
+                    TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
+            result = executorService.submit(dataflowMarker);
+        }
         HyracksDataException hde = null;
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
-                IRawRecord<? extends T> record = recordReader.next();
-                if (record == null) {
-                    flush();
-                    Thread.sleep(interval);
-                    continue;
+                // synchronized on mutex before we call next() so we don't a marker before its record
+                synchronized (mutex) {
+                    IRawRecord<? extends T> record = recordReader.next();
+                    if (record == null) {
+                        flush();
+                        wait(INTERVAL);
+                        continue;
+                    }
+                    tb.reset();
+                    parseAndForward(record);
                 }
-                tb.reset();
-                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
-            LOGGER.warn("Feed has been interrupted. Closing the feed");
+            LOGGER.warn("Feed has been interrupted. Closing the feed", e);
+            Thread.currentThread().interrupt();
         } catch (Exception e) {
             failed = true;
             tupleForwarder.flush();
@@ -90,10 +113,13 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
             hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
         } finally {
             closeSignal();
-            if (hde != null) {
-                throw hde;
+            if (sendMarker && result != null) {
+                result.cancel(true);
             }
         }
+        if (hde != null) {
+            throw hde;
+        }
     }
 
     private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
@@ -170,4 +196,53 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         // This is not a parser record. most likely, this error happened in the record reader.
         return recordReader.handleException(th);
     }
+
+    private class DataflowMarker implements Runnable {
+        private final IFeedMarker marker;
+        private final VSizeFrame mark;
+        private volatile boolean stopped = false;
+
+        public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
+            this.marker = marker;
+            this.mark = mark;
+        }
+
+        public synchronized void stop() {
+            stopped = true;
+            notify();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    synchronized (this) {
+                        if (!stopped) {
+                            // TODO (amoudi): find a better reactive way to do this
+                            // sleep for two seconds
+                            wait(TimeUnit.SECONDS.toMillis(2));
+                        } else {
+                            break;
+                        }
+                    }
+                    synchronized (mutex) {
+                        if (marker.mark(mark)) {
+                            // broadcast
+                            tupleForwarder.flush();
+                            // clear
+                            mark.getBuffer().clear();
+                            mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+                            mark.getBuffer().flip();
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("Marker stopped", e);
+                Thread.currentThread().interrupt();
+                return;
+            } catch (Exception e) {
+                LOGGER.warn("Marker stopped", e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index f1eb870..36c6c2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -30,10 +30,12 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -59,7 +61,7 @@ public class FeedTupleForwarder implements ITupleForwarder {
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
             // Set null feed message
-            VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
+            VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
             // a null message
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();


Mime
View raw message