Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D0873200BB4 for ; Mon, 17 Oct 2016 21:55:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CEDEB160AE2; Mon, 17 Oct 2016 19:55:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 79031160AFD for ; Mon, 17 Oct 2016 21:54:59 +0200 (CEST) Received: (qmail 55188 invoked by uid 500); 17 Oct 2016 19:54:53 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 53869 invoked by uid 99); 17 Oct 2016 19:54:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 19:54:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 90934E0FC4; Mon, 17 Oct 2016 19:54:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prestonc@apache.org To: commits@asterixdb.apache.org Date: Mon, 17 Oct 2016 19:55:27 -0000 Message-Id: <2a969e57b9d2451d95764df37792d735@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [37/50] [abbrv] asterixdb git commit: Snapshot after workign frame tuple appender. archived-at: Mon, 17 Oct 2016 19:55:01 -0000 Snapshot after workign frame tuple appender. Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/23eab43d Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/23eab43d Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/23eab43d Branch: refs/heads/ecarm002/interval_join_merge Commit: 23eab43dda9860030c2065343208a05dcc437486 Parents: fd514a0 Author: Preston Carman Authored: Sun Sep 25 09:37:33 2016 -0700 Committer: Preston Carman Committed: Sun Sep 25 09:37:33 2016 -0700 ---------------------------------------------------------------------- .../intervalindex/IntervalIndexJoiner.java | 77 +++++++++++--------- ...IntervalPartitionJoinOperatorDescriptor.java | 2 + .../IntervalPartitionJoiner.java | 15 +++- .../sort/util/DeletableFrameTupleAppender.java | 1 + .../util/DeletableFrameTupleAppenderTest.java | 50 +++++++++---- 5 files changed, 93 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index d3aaa65..a4ad666 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.operators.joins.intervalindex; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -62,7 +61,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private final int[] streamIndex; private final RunFileStream[] runFileStream; - private final LinkedList buffer = new LinkedList<>(); +// private final LinkedList buffer = new LinkedList<>(); private final IIntervalMergeJoinChecker imjc; @@ -124,6 +123,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]); runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]); + LOGGER.setLevel(Level.FINE); + System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel()); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize + " frames of memory."); @@ -246,9 +247,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey); long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey); if (leftStart < rightStart) { + // Left stream has next tuple, check if right active must be updated first. return activeManager[RIGHT_PARTITION].hasRecords() && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart; } else { + // Right stream has next tuple, check if left active must be update first. return !(activeManager[LEFT_PARTITION].hasRecords() && activeManager[LEFT_PARTITION].getTopPoint() < rightStart); } @@ -334,7 +337,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { // Add to active, end point index and buffer. TuplePointer tp = new TuplePointer(); if (activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) { - buffer.add(tp); + processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], + inputAccessor[LEFT_PARTITION], true, writer); +// buffer.add(tp); } else { // Spill case freezeAndSpill(); @@ -348,10 +353,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && !checkToProcessRightTuple()); // Add Results - if (!buffer.isEmpty()) { - processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, - memoryAccessor[LEFT_PARTITION], true, writer); - } +// if (!buffer.isEmpty()) { +// processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, +// memoryAccessor[LEFT_PARTITION], true, writer); +// } } private void processRightTuple(IFrameWriter writer) throws HyracksDataException { @@ -364,7 +369,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { // Add to active, end point index and buffer. TuplePointer tp = new TuplePointer(); if (activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) { - buffer.add(tp); + processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], + inputAccessor[RIGHT_PARTITION], false, writer); +// buffer.add(tp); } else { // Spill case freezeAndSpill(); @@ -378,32 +385,32 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } while (loadRightTuple().isLoaded() && checkToProcessRightTuple()); // Add Results - if (!buffer.isEmpty()) { - processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, - memoryAccessor[RIGHT_PARTITION], false, writer); - } +// if (!buffer.isEmpty()) { +// processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, +// memoryAccessor[RIGHT_PARTITION], false, writer); +// } } - private void processActiveJoin(List outer, ITuplePointerAccessor outerAccessor, - List inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer) - throws HyracksDataException { - for (TuplePointer outerTp : outer) { - outerAccessor.reset(outerTp); - for (TuplePointer innerTp : inner) { - innerAccessor.reset(innerTp); - if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, - innerTp.getTupleIndex(), reversed)) { - addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), - reversed, writer); - } - joinComparisonCount++; - } - } - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Sweep for " + buffer.size() + " tuples"); - } - buffer.clear(); - } +// private void processActiveJoin(List outer, ITuplePointerAccessor outerAccessor, +// List inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer) +// throws HyracksDataException { +// for (TuplePointer outerTp : outer) { +// outerAccessor.reset(outerTp); +// for (TuplePointer innerTp : inner) { +// innerAccessor.reset(innerTp); +// if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, +// innerTp.getTupleIndex(), reversed)) { +// addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), +// reversed, writer); +// } +// joinComparisonCount++; +// } +// } +// if (LOGGER.isLoggable(Level.FINE)) { +// LOGGER.fine("Sweep for " + buffer.size() + " tuples"); +// } +// buffer.clear(); +// } private void processTupleJoin(List outer, ITuplePointerAccessor outerAccessor, ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException { @@ -456,6 +463,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException { int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION; + runFileStream[frozenPartition].flushAndStopRunFile(accessor); if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION] + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION) @@ -474,8 +482,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { spillReadCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getReadCount(); spillWriteCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getWriteCount(); } - - runFileStream[frozenPartition].flushAndStopRunFile(accessor); flushMemory(flushPartition); if ((LEFT_PARTITION == frozenPartition && !status.branch[LEFT_PARTITION].isRunFileReading()) || (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) { @@ -489,8 +495,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { @Override public void closeInput(int partition) throws HyracksDataException { - // TODO Auto-generated method stub - + // No changes are required. } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index 6ea1e6f..c7986e6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -160,6 +160,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc); state.ipj.initBuild(); + LOGGER.setLevel(Level.FINE); + System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel()); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k + " granules repesenting " + state.intervalPartitions + " interval partitions using " http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java index e943a48..31e200b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java @@ -100,6 +100,8 @@ public class IntervalPartitionJoiner { private long spillCount = 0; private long spillReadCount = 0; private long spillWriteCount = 0; + private long buildSize; + private int tmp = -1; public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions, String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, @@ -129,6 +131,8 @@ public class IntervalPartitionJoiner { public void initBuild() throws HyracksDataException { buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions, memForJoin * ctx.getInitialFrameSize()); + System.err.println("k: " + k); + buildSize = 0; } private IPartitionedMemoryConstrain getPartitionMemoryConstrain() { @@ -139,14 +143,23 @@ public class IntervalPartitionJoiner { accessorBuild.reset(buffer); int tupleCount = accessorBuild.getTupleCount(); + int pid; for (int i = 0; i < tupleCount; ++i) { - int pid = buildHpc.partition(accessorBuild, i, k); + pid = buildHpc.partition(accessorBuild, i, k); + + if (tmp != pid) { + System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: " + IntervalPartitionUtil.getIntervalPartition(pid, k)); + tmp = pid; + } processTuple(i, pid); ipjd.buildIncrementCount(pid); + buildSize++; } } public void closeBuild() throws HyracksDataException { + System.err.println("buildSize: " + buildSize); + int inMemoryPartitions = 0; int totalBuildPartitions = 0; flushAndClearBuildSpilledPartition(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java index 8cae721..d242daa 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java @@ -181,6 +181,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc this.array = buffer.array(); setIndexCount(0); setDeletedSpace(0); + setNextIndex(0); setTupleAppend(0); resetCounts(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java index 7686540..af3cdfc 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java @@ -37,15 +37,19 @@ import org.apache.hyracks.util.string.UTF8StringUtil; import org.junit.Before; import org.junit.Test; +/** + * @see org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender + */ public class DeletableFrameTupleAppenderTest { + private static final int META_DATA_SIZE = 4 + 4 + 4 + 4; + private static final int SLOT_SIZE = 4 + 4; + private static final char TEST_CH = 'x'; + DeletableFrameTupleAppender appender; - ISerializerDeserializer[] fields = new ISerializerDeserializer[] { - IntegerSerializerDeserializer.INSTANCE, - new UTF8StringSerializerDeserializer(), - }; + ISerializerDeserializer[] fields = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, + new UTF8StringSerializerDeserializer(), }; RecordDescriptor recordDescriptor = new RecordDescriptor(fields); ArrayTupleBuilder builder = new ArrayTupleBuilder(recordDescriptor.getFieldCount()); - static final char TEST_CH = 'x'; int cap = 256; @@ -60,26 +64,42 @@ public class DeletableFrameTupleAppenderTest { appender.clear(buffer); assertTrue(appender.getBuffer() == buffer); assertTrue(appender.getTupleCount() == 0); - assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4); + assertTrue(appender.getTotalFreeSpace() == cap - META_DATA_SIZE); + assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE); } ByteBuffer makeAFrame(int capacity, int count, int deletedBytes) throws HyracksDataException { ByteBuffer buffer = ByteBuffer.allocate(capacity); int metaOffset = capacity - 4; + buffer.putInt(metaOffset, count); + metaOffset -= 4; buffer.putInt(metaOffset, deletedBytes); + // next index metaOffset -= 4; buffer.putInt(metaOffset, count); + // append slot + metaOffset -= 4; + int appendOffset = metaOffset; + buffer.putInt(metaOffset, 0); metaOffset -= 4; + + int start = 0; for (int i = 0; i < count; i++, metaOffset -= 4) { makeARecord(builder, i); for (int x = 0; x < builder.getFieldEndOffsets().length; x++) { buffer.putInt(builder.getFieldEndOffsets()[x]); } buffer.put(builder.getByteArray(), 0, builder.getSize()); - assert (metaOffset > buffer.position()); + + // Add slot information + buffer.putInt(metaOffset, start); + metaOffset -= 4; buffer.putInt(metaOffset, buffer.position()); + start = buffer.position(); + assert (metaOffset > buffer.position()); } + buffer.putInt(appendOffset, start); return buffer; } @@ -110,16 +130,16 @@ public class DeletableFrameTupleAppenderTest { appender.reset(buffer); assertTrue(appender.getBuffer() == buffer); assertTrue(appender.getTupleCount() == 0); - assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4); + assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE); - int count = 10; + int count = 8; int deleted = 7; buffer = makeAFrame(cap, count, deleted); int pos = buffer.position(); appender.reset(buffer); assertTrue(appender.getBuffer() == buffer); assertTrue(appender.getTupleCount() == count); - assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4 - count * 4 - pos); + assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE - count * SLOT_SIZE - pos); assertTrue(appender.getTotalFreeSpace() == appender.getContiguousFreeSpace() + deleted); int dataOffset = 0; @@ -130,7 +150,7 @@ public class DeletableFrameTupleAppenderTest { @Test public void testAppend() throws Exception { - int count = 10; + int count = 8; ByteBuffer bufferRead = makeAFrame(cap, count, 0); DeletableFrameTupleAppender accessor = new DeletableFrameTupleAppender(recordDescriptor); accessor.reset(bufferRead); @@ -146,7 +166,7 @@ public class DeletableFrameTupleAppenderTest { @Test public void testDelete() throws Exception { - int count = 10; + int count = 8; int deleteSpace = 0; ByteBuffer buffer = makeAFrame(cap, count, deleteSpace); appender.reset(buffer); @@ -165,7 +185,7 @@ public class DeletableFrameTupleAppenderTest { public void testResetAfterDelete() throws Exception { testDelete(); appender.reset(appender.getBuffer()); - assertEquals(cap - appender.getTupleCount() * 4 - 4 - 4, appender.getTotalFreeSpace()); + assertEquals(cap - appender.getTupleCount() * SLOT_SIZE - META_DATA_SIZE, appender.getTotalFreeSpace()); } @@ -187,7 +207,7 @@ public class DeletableFrameTupleAppenderTest { @Test public void testAppendAndDelete() throws Exception { int cap = 1024; - int count = 10; + int count = 8; int deleteSpace = 0; ByteBuffer buffer = makeAFrame(cap, count, deleteSpace); int dataOffset = buffer.position(); @@ -221,7 +241,7 @@ public class DeletableFrameTupleAppenderTest { @Test public void testReOrganizeBuffer() throws Exception { - int count = 10; + int count = 8; testDelete(); appender.reOrganizeBuffer(); ByteBuffer bufferRead = makeAFrame(cap, count, 0);