asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] incubator-asterixdb git commit: Add Unit Tests for Feed Runtime Input Handler
Date Thu, 19 May 2016 21:56:36 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master f81539522 -> 803a3a2fa


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
new file mode 100644
index 0000000..705d5e3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -0,0 +1,794 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestControlledFrameWriter;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class InputHandlerTest extends TestCase {
+
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int NUM_FRAMES = 128;
+    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
+    private static final String DATAVERSE = "dataverse";
+    private static final String DATASET = "dataset";
+    private static final String FEED = "feed";
+    private static final String NODE_ID = "NodeId";
+    private static final float DISCARD_ALLOWANCE = 0.15f;
+    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
+    private volatile static HyracksDataException cause = null;
+
+    public InputHandlerTest(String testName) {
+        super(testName);
+    }
+
+    public static Test suite() {
+        return new TestSuite(InputHandlerTest.class);
+    }
+
+    private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
+            FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
+        FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
+        FeedId feedId = new FeedId(DATAVERSE, FEED);
+        FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
+        FeedRuntimeId runtimeId =
+                new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0, FeedRuntimeId.DEFAULT_TARGET_ID);
+        return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
+    }
+
+    /*
+     * Testing the following scenarios
+     * 01. Positive Frames memory budget with fixed size frames, no spill, no discard.
+     * 02. Positive Frames memory budget with variable size frames, no spill, no discard.
+     * 03. Positive Frames memory budget with fixed size frames, with spill, no discard.
+     * 04. Positive Frames memory budget with variable size frames, with spill, no discard.
+     * 05. Positive Frames memory budget with fixed size frames, no spill, with discard.
+     * 06. Positive Frames memory budget with variable size frames, no spill, with discard.
+     * 07. Positive Frames memory budget with fixed size frames, with spill, with discard.
+     * 08. Positive Frames memory budget with variable size frames, with spill, with discard.
+     * 09. 0 Frames memory budget with fixed size frames, with spill, no discard.
+     * 10. 0 Frames memory budget with variable size frames, with spill, no discard.
+     * 11. TODO 0 Frames memory budget with fixed size frames, with spill, with discard.
+     * 12. TODO 0 Frames memory budget with variable size frames, with spill, with discard.
+     * 13. TODO Test exception handling with Open, NextFrame,Flush,Close,Fail exception throwing FrameWriter
+     * 14. TODO Test exception while waiting for subscription
+     */
+
+    private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, boolean discard, long spillBudget,
+            float discardFraction) {
+        FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
+        Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
+        Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
+        Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
+        Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
+        Mockito.when(fpa.getMaxFractionDiscard()).thenReturn(discardFraction);
+        return fpa;
+    }
+
+    @org.junit.Test
+    public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
+        try {
+            int numRounds = 5;
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            handler.nextFrame(buffer);
+            Assert.assertEquals(0, handler.getNumProcessedInMemory());
+            Assert.assertEquals(1, handler.getNumSpilled());
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                int multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+                handler.nextFrame(buffer);
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
+            writer.validate(false);
+            handler.close();
+            // Check that nextFrame was called
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
+        try {
+            int numRounds = 10;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            handler.nextFrame(frame.getBuffer());
+            Assert.assertEquals(0, handler.getNumProcessedInMemory());
+            Assert.assertEquals(1, handler.getNumSpilled());
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
+            writer.validate(false);
+            handler.close();
+            // Check that nextFrame was called
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        } finally {
+            Assert.assertNull(cause);
+        }
+
+    }
+
+    /*
+     * Spill = false;
+     * Discard = true; discard only 5%
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryVarSizeFrameWithSpillWithDiscard() {
+        try {
+            int numberOfMemoryFrames = 50;
+            int numberOfSpillFrames = 50;
+            int notDiscarded = 0;
+            int totalMinFrames = 0;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            ByteBuffer buffer2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+            ByteBuffer buffer3 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 3);
+            ByteBuffer buffer4 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 4);
+            ByteBuffer buffer5 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 5);
+            while (true) {
+                if (totalMinFrames + 1 < numberOfMemoryFrames) {
+                    handler.nextFrame(buffer1);
+                    notDiscarded++;
+                    totalMinFrames++;
+                } else {
+                    break;
+                }
+                if (totalMinFrames + 2 < numberOfMemoryFrames) {
+                    notDiscarded++;
+                    totalMinFrames += 2;
+                    handler.nextFrame(buffer2);
+                } else {
+                    break;
+                }
+                if (totalMinFrames + 3 < numberOfMemoryFrames) {
+                    notDiscarded++;
+                    totalMinFrames += 3;
+                    handler.nextFrame(buffer3);
+                } else {
+                    break;
+                }
+            }
+            // Now we need to verify that the frame pool memory has been consumed!
+            Assert.assertTrue(framePool.remaining() < 3);
+            Assert.assertEquals(0, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            while (true) {
+                if (handler.getNumSpilled() < numberOfSpillFrames) {
+                    notDiscarded++;
+                    handler.nextFrame(buffer3);
+                } else {
+                    break;
+                }
+                if (handler.getNumSpilled() < numberOfSpillFrames) {
+                    notDiscarded++;
+                    handler.nextFrame(buffer4);
+                } else {
+                    break;
+                }
+                if (handler.getNumSpilled() < numberOfSpillFrames) {
+                    notDiscarded++;
+                    handler.nextFrame(buffer5);
+                } else {
+                    break;
+                }
+            }
+            Assert.assertTrue(framePool.remaining() < 3);
+            Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
+            Assert.assertEquals(handler.framesOnDisk(), numberOfSpillFrames);
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            // We can only discard one frame
+            double numDiscarded = 0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(buffer5);
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            Assert.assertTrue(framePool.remaining() < 3);
+            Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+            // Next Call should block since we're exceeding the discard allowance
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer5, handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), notDiscarded + 1);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = true;
+     * Discard = true
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
+        try {
+            int numberOfMemoryFrames = 50;
+            int numberOfSpillFrames = 50;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            for (int i = 0; i < numberOfMemoryFrames; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Now we need to verify that the frame pool memory has been consumed!
+            Assert.assertEquals(0, framePool.remaining());
+            Assert.assertEquals(numberOfMemoryFrames, handler.getTotal());
+            Assert.assertEquals(0, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            for (int i = 0; i < numberOfSpillFrames; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            Assert.assertEquals(0, framePool.remaining());
+            Assert.assertEquals(numberOfMemoryFrames + numberOfSpillFrames, handler.getTotal());
+            Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            // We can only discard one frame
+            double numDiscarded = 0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(frame.getBuffer());
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            Assert.assertEquals(0, framePool.remaining());
+            Assert.assertEquals((int) (numberOfMemoryFrames + numberOfSpillFrames + numDiscarded), handler.getTotal());
+            Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+            // Next Call should block since we're exceeding the discard allowance
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            } else {
+                Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertTrue(result.isDone());
+            Assert.assertEquals(writer.nextFrameCount(), numberOfMemoryFrames + numberOfSpillFrames + 1);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = true; discard only 5%
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
+        try {
+            int discardTestFrames = 100;
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            // add NUM_FRAMES times
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            int multiplier = 1;
+            int numFrames = 0;
+            // add NUM_FRAMES times
+            while ((multiplier <= framePool.remaining())) {
+                numFrames++;
+                handler.nextFrame(buffer);
+                multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+            }
+            // Next call should NOT block but should discard.
+            double numDiscarded = 0.0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(buffer);
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            } else {
+                // Check that no records were discarded
+                assertEquals((int) numDiscarded, handler.getNumDiscarded());
+                // Check that one frame is spilled
+                assertEquals(handler.getNumSpilled(), 0);
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), numFrames + 1);
+            // exit
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = true; discard only 5%
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
+        try {
+            int discardTestFrames = 100;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < discardTestFrames; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Next 5 calls call should NOT block but should discard.
+            double numDiscarded = 0.0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(frame.getBuffer());
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            // Next Call should block since we're exceeding the discard allowance
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            } else {
+                // Check that no records were discarded
+                assertEquals((int) numDiscarded, handler.getNumDiscarded());
+                // Check that one frame is spilled
+                assertEquals(handler.getNumSpilled(), 0);
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), discardTestFrames + 1);
+            // exit
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = true;
+     * Discard = false;
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
+        try {
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Next call should NOT block. we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            result.get();
+            // Check that no records were discarded
+            assertEquals(handler.getNumDiscarded(), 0);
+            // Check that one frame is spilled
+            assertEquals(handler.getNumSpilled(), 1);
+            // consume memory frames
+            writer.unfreeze();
+            handler.close();
+            Assert.assertEquals(handler.framesOnDisk(), 0);
+            // exit
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = false;
+     * Fixed size frames
+     * Very fast next operator
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
+        try {
+            int numRounds = 10;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(handler.getNumSpilled(), 0);
+            writer.validate(false);
+            handler.close();
+            // Check that nextFrame was called
+            Assert.assertEquals(NUM_FRAMES * numRounds, writer.nextFrameCount());
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = false;
+     * Fixed size frames
+     * Slow next operator
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
+        try {
+            int numRounds = 10;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            writer.setNextDuration(1);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(handler.getNumSpilled(), 0);
+            // Check that nextFrame was called
+            writer.validate(false);
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), (NUM_FRAMES * numRounds));
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false
+     * Discard = false
+     * VarSizeFrame
+     */
+    public void testMemoryVarSizeFrameNoDiskNoDiscard() {
+        try {
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            int multiplier = 1;
+            // add NUM_FRAMES times
+            while ((multiplier <= framePool.remaining())) {
+                handler.nextFrame(buffer);
+                multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+            }
+            // we can't satisfy the next request
+            // Next call should block we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+            // Check that the nextFrame didn't return
+            if (result.isDone()) {
+                Assert.fail();
+            }
+            // Check that no records were discarded
+            assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            assertEquals(handler.getNumSpilled(), 0);
+            // Check that number of stalled is not greater than 1
+            Assert.assertTrue(handler.getNumStalled() <= 1);
+            writer.unfreeze();
+            result.get();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = true;
+     * Discard = false;
+     * Variable size frames
+     */
+    @org.junit.Test
+    public void testMemoryVarSizeFrameWithSpillNoDiscard() {
+        try {
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            int multiplier = 1;
+            // add NUM_FRAMES times
+            while ((multiplier <= framePool.remaining())) {
+                handler.nextFrame(buffer);
+                multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+            }
+            // Next call should Not block. we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+            result.get();
+            // Check that no records were discarded
+            assertEquals(handler.getNumDiscarded(), 0);
+            // Check that one frame is spilled
+            assertEquals(handler.getNumSpilled(), 1);
+            // consume memory frames
+            while (!handler.getInternalBuffer().isEmpty()) {
+                writer.kick();
+            }
+            // There should be 1 frame on disk
+            Assert.assertEquals(1, handler.framesOnDisk());
+            writer.unfreeze();
+            result.get();
+            Assert.assertEquals(0, handler.framesOnDisk());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = false;
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
+        try {
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Next call should block we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            // Check that the nextFrame didn't return
+            if (result.isDone()) {
+                Assert.fail();
+            } else {
+                // Check that no records were discarded
+                Assert.assertEquals(handler.getNumDiscarded(), 0);
+                // Check that no records were spilled
+                Assert.assertEquals(handler.getNumSpilled(), 0);
+                // Check that no records were discarded
+                // Check that the inputHandler subscribed to the framePool
+                // Check that number of stalled is not greater than 1
+                Assert.assertTrue(handler.getNumStalled() <= 1);
+                writer.kick();
+            }
+            result.get();
+            writer.unfreeze();
+            handler.close();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    private class Pusher implements Runnable {
+        private final ByteBuffer buffer;
+        private final IFrameWriter writer;
+
+        public Pusher(ByteBuffer buffer, IFrameWriter writer) {
+            this.buffer = buffer;
+            this.writer = writer;
+        }
+
+        @Override
+        public void run() {
+            try {
+                writer.nextFrame(buffer);
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+                cause = e;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 3961921..9336921 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -39,7 +39,23 @@
   <properties>
     <root.dir>${basedir}/../..</root.dir>
   </properties>
-
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
   <dependencies>
     <dependency>
       <groupId>org.json</groupId>
@@ -77,5 +93,22 @@
       <artifactId>hyracks-util</artifactId>
       <version>0.2.18-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>2.0.2-beta</version>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <version>1.6.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.6.2</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
new file mode 100644
index 0000000..19998a7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowError extends CountAnswer {
+    private String errorMessage;
+
+    public CountAndThrowError(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    @Override
+    public Object call() throws HyracksDataException {
+        count++;
+        throw new UnknownError(errorMessage);
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        count++;
+        throw new UnknownError(errorMessage);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
new file mode 100644
index 0000000..5a5ad59
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowException extends CountAnswer {
+    private String errorMessage;
+
+    public CountAndThrowException(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    @Override
+    public Object call() throws HyracksDataException {
+        count++;
+        throw new HyracksDataException(errorMessage);
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        count++;
+        throw new HyracksDataException(errorMessage);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
new file mode 100644
index 0000000..e8a6654
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class CountAnswer implements Answer<Object> {
+    protected int count = 0;
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        count++;
+        return null;
+    }
+
+    public Object call() throws HyracksDataException {
+        count++;
+        return null;
+    }
+
+    public int getCallCount() {
+        return count;
+    }
+
+    public void reset() {
+        count = 0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
new file mode 100644
index 0000000..4bddfa9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.util.Collection;
+
+public class FrameWriterTestUtils {
+    public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method ";
+    public static final String ERROR_MESSAGE = "IFrameWriter Error in the call to the method ";
+
+    public enum FrameWriterOperation {
+        Open,
+        NextFrame,
+        Fail,
+        Flush,
+        Close
+    }
+
+    public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations) {
+        CountAnswer openAnswer =
+                createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer =
+                createAnswer(FrameWriterOperation.NextFrame, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer =
+                createAnswer(FrameWriterOperation.Flush, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer =
+                createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer =
+                createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations);
+        return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer);
+    }
+
+    public static CountAnswer createAnswer(FrameWriterOperation operation,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations) {
+        if (exceptionThrowingOperations.contains(operation)) {
+            return new CountAndThrowException(EXCEPTION_MESSAGE + operation.toString());
+        } else if (exceptionThrowingOperations.contains(operation)) {
+            return new CountAndThrowError(ERROR_MESSAGE + operation.toString());
+        } else {
+            return new CountAnswer();
+        }
+    }
+
+    public static TestControlledFrameWriter create(int initialFrameSize) {
+        return new TestControlledFrameWriter(initialFrameSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
new file mode 100644
index 0000000..2a3f70d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestControlledFrameWriter extends TestFrameWriter {
+    private boolean frozen = false;
+    private boolean timed = false;
+    private long duration = Long.MAX_VALUE;
+    private final int initialFrameSize;
+    private volatile int currentMultiplier = 0;
+    private volatile int kicks = 0;
+
+    public TestControlledFrameWriter(int initialFrameSize) {
+        super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer());
+        this.initialFrameSize = initialFrameSize;
+    }
+
+    public int getCurrentMultiplier() {
+        return currentMultiplier;
+    }
+
+    public synchronized void freeze() {
+        frozen = true;
+    }
+
+    public synchronized void time(long ms) {
+        frozen = true;
+        timed = true;
+        duration = ms;
+    }
+
+    public synchronized void unfreeze() {
+        frozen = false;
+        notify();
+    }
+
+    public synchronized void kick() {
+        kicks++;
+        notify();
+    }
+
+    @Override
+    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        super.nextFrame(buffer);
+        currentMultiplier = buffer.capacity() / initialFrameSize;
+        if (frozen) {
+            try {
+                if (timed) {
+                    wait(duration);
+                } else {
+                    while (frozen && kicks == 0) {
+                        wait();
+                    }
+                    kicks--;
+                }
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        currentMultiplier = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
new file mode 100644
index 0000000..b3492fe
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestFrameWriter implements IFrameWriter {
+    private final CountAnswer openAnswer;
+    private final CountAnswer nextAnswer;
+    private final CountAnswer flushAnswer;
+    private final CountAnswer failAnswer;
+    private final CountAnswer closeAnswer;
+    private long openDuration = 0L;
+    private long nextDuration = 0L;
+    private long flushDuration = 0L;
+    private long failDuration = 0L;
+    private long closeDuration = 0L;
+
+    public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer,
+            CountAnswer failAnswer, CountAnswer closeAnswer) {
+        this.openAnswer = openAnswer;
+        this.nextAnswer = nextAnswer;
+        this.closeAnswer = closeAnswer;
+        this.flushAnswer = flushAnswer;
+        this.failAnswer = failAnswer;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        delay(openDuration);
+        openAnswer.call();
+    }
+
+    public int openCount() {
+        return openAnswer.getCallCount();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        delay(nextDuration);
+        nextAnswer.call();
+    }
+
+    public int nextFrameCount() {
+        return nextAnswer.getCallCount();
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        delay(flushDuration);
+        flushAnswer.call();
+    }
+
+    public int flushCount() {
+        return flushAnswer.getCallCount();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        delay(failDuration);
+        failAnswer.call();
+    }
+
+    public int failCount() {
+        return failAnswer.getCallCount();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        delay(closeDuration);
+        closeAnswer.call();
+    }
+
+    public int closeCount() {
+        return closeAnswer.getCallCount();
+    }
+
+    public synchronized boolean validate(boolean finished) {
+        if (failAnswer.getCallCount() > 1 || closeAnswer.getCallCount() > 1 || openAnswer.getCallCount() > 1) {
+            return false;
+        }
+        if (openAnswer.getCallCount() == 0
+                && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0 || closeAnswer.getCallCount() > 0)) {
+            return false;
+        }
+        if (finished) {
+            if (closeAnswer.getCallCount() == 0 && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0
+                    || openAnswer.getCallCount() > 0)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void reset() {
+        openAnswer.reset();
+        nextAnswer.reset();
+        flushAnswer.reset();
+        failAnswer.reset();
+        closeAnswer.reset();
+    }
+
+    public long getOpenDuration() {
+        return openDuration;
+    }
+
+    public void setOpenDuration(long openDuration) {
+        this.openDuration = openDuration;
+    }
+
+    public long getNextDuration() {
+        return nextDuration;
+    }
+
+    public void setNextDuration(long nextDuration) {
+        this.nextDuration = nextDuration;
+    }
+
+    public long getFlushDuration() {
+        return flushDuration;
+    }
+
+    public void setFlushDuration(long flushDuration) {
+        this.flushDuration = flushDuration;
+    }
+
+    public long getFailDuration() {
+        return failDuration;
+    }
+
+    public void setFailDuration(long failDuration) {
+        this.failDuration = failDuration;
+    }
+
+    public long getCloseDuration() {
+        return closeDuration;
+    }
+
+    public void setCloseDuration(long closeDuration) {
+        this.closeDuration = closeDuration;
+    }
+
+    private void delay(long duration) throws HyracksDataException {
+        if (duration > 0) {
+            try {
+                synchronized (this) {
+                    wait(duration);
+                }
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 581fde4..d07d633 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -39,6 +39,13 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-common</artifactId>
       <version>0.2.18-SNAPSHOT</version>
       <type>jar</type>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index df3a211..d3e7a3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -30,6 +30,9 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAndThrowError;
+import org.apache.hyracks.api.test.CountAndThrowException;
+import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -92,8 +95,8 @@ public class FramewriterTest {
     public boolean validate(boolean finished) {
         // get number of open calls
         int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount();
-        int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount()
-                + nextFrameError.getCallCount();
+        int nextFrameCount =
+                nextFrameException.getCallCount() + nextFrameNormal.getCallCount() + nextFrameError.getCallCount();
         int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount();
         int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount();
 
@@ -422,8 +425,9 @@ public class FramewriterTest {
     public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException {
         IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories();
         ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories();
-        AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
-                * searchOpCallbackFactories.length];
+        AbstractTreeIndexOperatorDescriptor[] opDescs =
+                new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+                        * searchOpCallbackFactories.length];
         int k = 0;
         for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
             for (int j = 0; j < searchOpCallbackFactories.length; j++) {
@@ -452,52 +456,6 @@ public class FramewriterTest {
         return opCallback;
     }
 
-    public class CountAnswer implements Answer<Object> {
-        protected int count = 0;
-
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            count++;
-            return null;
-        }
-
-        public int getCallCount() {
-            return count;
-        }
-
-        public void reset() {
-            count = 0;
-        }
-    }
-
-    public class CountAndThrowException extends CountAnswer {
-        private String errorMessage;
-
-        public CountAndThrowException(String errorMessage) {
-            this.errorMessage = errorMessage;
-        }
-
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            count++;
-            throw new HyracksDataException(errorMessage);
-        }
-    }
-
-    public class CountAndThrowError extends CountAnswer {
-        private String errorMessage;
-
-        public CountAndThrowError(String errorMessage) {
-            this.errorMessage = errorMessage;
-        }
-
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            count++;
-            throw new UnknownError(errorMessage);
-        }
-    }
-
     public IFrameWriter[] createOutputWriters() throws Exception {
         CountAnswer[] opens = new CountAnswer[] { openNormal, openException, openError };
         CountAnswer[] nextFrames = new CountAnswer[] { nextFrameNormal, nextFrameException, nextFrameError };


Mime
View raw message