asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [02/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.
Date Thu, 18 Jun 2015 04:22:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
index e46e685..790552c 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
@@ -19,10 +19,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -42,9 +42,7 @@ public class WordTupleParserFactory implements ITupleParserFactory {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
                 try {
-                    ByteBuffer frame = ctx.allocateFrame();
-                    FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                    appender.reset(frame, true);
+                    FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
                     ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
                     DataOutput dos = tb.getDataOutput();
 
@@ -54,17 +52,10 @@ public class WordTupleParserFactory implements ITupleParserFactory {
                         tb.reset();
                         utf8StringParser.parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
                         tb.addFieldEndOffset();
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            FrameUtils.flushFrame(frame, writer);
-                            appender.reset(frame, true);
-                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
-                            }
-                        }
-                    }
-                    if (appender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(frame, writer);
+                        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                                tb.getSize());
                     }
+                    appender.flush(writer, true);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index a437f37..e22f27f 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -56,7 +56,7 @@
             <configuration>
               <programs>
                 <program>
-                  <mainClass>edu.uci.ics.hyracks.examples.tpch.client.Main</mainClass>
+                  <mainClass>edu.uci.ics.hyracks.examples.tpch.client.Sort</mainClass>
                   <name>tpchclient</name>
                 </program>
               </programs>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java
new file mode 100644
index 0000000..17f1d3b
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Common.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.examples.tpch.client;
+
+import java.io.File;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class Common {
+    static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+    static RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE });
+    static RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE });
+
+    static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+
+
+    static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+        }
+        return fSplits;
+    }
+
+    static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+        String[] parts = new String[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            parts[i] = splits[i].getNodeName();
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java
new file mode 100644
index 0000000..748d809
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Join.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.tpch.client;
+
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.*;
+
+import java.util.EnumSet;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+public class Join {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
+        public int port = 1098;
+
+        @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
+        public String inFileCustomerSplits;
+
+        @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
+        public String inFileOrderSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
+        public int numJoinPartitions = 1;
+
+        @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
+        public boolean profile = true;
+
+        @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+        public int tableSize = 8191;
+
+        @Option(name = "-algo", usage = "Join types", required = true)
+        public String algo;
+
+        // For grace/hybrid hash join only
+        @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
+        public int memSize;
+
+        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
+        public int graceInputSize = 10;
+
+        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+        public int graceRecordsPerFrame = 200;
+
+        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+        public double graceFactor = 1.2;
+
+        // Whether group-by is processed after the join
+        @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
+        public boolean hasGroupBy = false;
+
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+                options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(job,
+                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+            FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
+            double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
+            throws HyracksDataException {
+        JobSpecification spec = new JobSpecification(frameSize);
+
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), Common.ordersDesc);
+        createPartitionConstraint(spec, ordScanner, orderSplits);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), Common.custDesc);
+        createPartitionConstraint(spec, custScanner, customerSplits);
+
+        IOperatorDescriptor join;
+
+        if ("nestedloop".equalsIgnoreCase(algo)) {
+            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), Common.custOrderJoinDesc,
+                    memSize, false, null);
+
+        } else if ("gracehash".equalsIgnoreCase(algo)) {
+            join = new GraceHashJoinOperatorDescriptor(
+                    spec,
+                    memSize,
+                    graceInputSize,
+                    graceRecordsPerFrame,
+                    graceFactor,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    Common.custOrderJoinDesc, null);
+
+        } else if ("hybridhash".equalsIgnoreCase(algo)) {
+            join = new HybridHashJoinOperatorDescriptor(
+                    spec,
+                    memSize,
+                    graceInputSize,
+                    graceRecordsPerFrame,
+                    graceFactor,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    Common.custOrderJoinDesc, null);
+
+        } else {
+            join = new InMemoryHashJoinOperatorDescriptor(
+                    spec,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    Common.custOrderJoinDesc, 6000000, null);
+        }
+
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
+
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IOperatorDescriptor endingOp = join;
+
+        if (hasGroupBy) {
+
+            RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+                    spec,
+                    new int[] { 6 },
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }),
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                    groupResultDesc, 16);
+            createPartitionConstraint(spec, gby, resultSplits);
+
+            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }));
+            spec.connect(joinGroupConn, join, 0, gby, 0);
+
+            endingOp = gby;
+        }
+
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        createPartitionConstraint(spec, writer, resultSplits);
+
+        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
+
+        spec.addRoot(writer);
+        return spec;
+    }
+
+
+
+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        private final IBinaryComparatorFactory bFactory;
+        private final int pos0;
+        private final int pos1;
+
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+            this.bFactory = bFactory;
+            this.pos0 = pos0;
+            this.pos1 = pos1;
+        }
+
+        @Override
+        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+        }
+    }
+
+    static class JoinComparator implements ITuplePairComparator {
+
+        private final IBinaryComparator bComparator;
+        private final int field0;
+        private final int field1;
+
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+            this.bComparator = bComparator;
+            this.field0 = field0;
+            this.field1 = field1;
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+                throws HyracksDataException {
+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+            return 0;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
deleted file mode 100644
index 1d4e6ce..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.examples.tpch.client;
-
-import java.io.File;
-import java.util.EnumSet;
-
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
-
-public class Main {
-    private static class Options {
-        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
-        public String host;
-
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
-        public int port = 1098;
-
-        @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
-        public String inFileCustomerSplits;
-
-        @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
-        public String inFileOrderSplits;
-
-        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
-        public String outFileSplits;
-
-        @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
-        public int numJoinPartitions = 1;
-
-        @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
-        public boolean profile = true;
-
-        @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
-        public int tableSize = 8191;
-
-        @Option(name = "-algo", usage = "Join types", required = true)
-        public String algo;
-
-        // For grace/hybrid hash join only
-        @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
-        public int memSize;
-
-        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
-        public int graceInputSize = 10;
-
-        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
-        public int graceRecordsPerFrame = 200;
-
-        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
-        public double graceFactor = 1.2;
-
-        // Whether group-by is processed after the join
-        @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
-        public boolean hasGroupBy = false;
-
-        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
-        public int frameSize = 32768;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        parser.parseArgument(args);
-
-        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
-
-        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
-                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
-                options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
-                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
-
-        long start = System.currentTimeMillis();
-        JobId jobId = hcc.startJob(job,
-                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
-        long end = System.currentTimeMillis();
-        System.err.println(start + " " + end + " " + (end - start));
-    }
-
-    private static FileSplit[] parseFileSplits(String fileSplits) {
-        String[] splits = fileSplits.split(",");
-        FileSplit[] fSplits = new FileSplit[splits.length];
-        for (int i = 0; i < splits.length; ++i) {
-            String s = splits[i].trim();
-            int idx = s.indexOf(':');
-            if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
-            }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
-        }
-        return fSplits;
-    }
-
-    private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
-            FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
-            double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
-            throws HyracksDataException {
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
-
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
-        createPartitionConstraint(spec, ordScanner, orderSplits);
-
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
-        createPartitionConstraint(spec, custScanner, customerSplits);
-
-        IOperatorDescriptor join;
-
-        if ("nestedloop".equalsIgnoreCase(algo)) {
-            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc,
-                    memSize, false, null);
-
-        } else if ("gracehash".equalsIgnoreCase(algo)) {
-            join = new GraceHashJoinOperatorDescriptor(
-                    spec,
-                    memSize,
-                    graceInputSize,
-                    graceRecordsPerFrame,
-                    graceFactor,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc, null);
-
-        } else if ("hybridhash".equalsIgnoreCase(algo)) {
-            join = new HybridHashJoinOperatorDescriptor(
-                    spec,
-                    memSize,
-                    graceInputSize,
-                    graceRecordsPerFrame,
-                    graceFactor,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc, null);
-
-        } else {
-            join = new InMemoryHashJoinOperatorDescriptor(
-                    spec,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc, 6000000, null);
-        }
-
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
-
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IOperatorDescriptor endingOp = join;
-
-        if (hasGroupBy) {
-
-            RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
-            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
-                    spec,
-                    new int[] { 6 },
-                    new FieldHashPartitionComputerFactory(new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }),
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    groupResultDesc, 16);
-            createPartitionConstraint(spec, gby, resultSplits);
-
-            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
-                    new FieldHashPartitionComputerFactory(new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }));
-            spec.connect(joinGroupConn, join, 0, gby, 0);
-
-            endingOp = gby;
-        }
-
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
-        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
-        createPartitionConstraint(spec, writer, resultSplits);
-
-        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
-
-        spec.addRoot(writer);
-        return spec;
-    }
-
-    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
-        String[] parts = new String[splits.length];
-        for (int i = 0; i < splits.length; ++i) {
-            parts[i] = splits[i].getNodeName();
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
-    }
-
-    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final IBinaryComparatorFactory bFactory;
-        private final int pos0;
-        private final int pos1;
-
-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
-            this.bFactory = bFactory;
-            this.pos0 = pos0;
-            this.pos1 = pos1;
-        }
-
-        @Override
-        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
-            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
-        }
-    }
-
-    static class JoinComparator implements ITuplePairComparator {
-
-        private final IBinaryComparator bComparator;
-        private final int field0;
-        private final int field1;
-
-        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
-            this.bComparator = bComparator;
-            this.field0 = field0;
-            this.field1 = field1;
-        }
-
-        @Override
-        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
-                throws HyracksDataException {
-            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
-            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
-            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
-            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
-            int fLen0 = fEnd0 - fStart0;
-
-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
-            int fLen1 = fEnd1 - fStart1;
-
-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
-            if (c != 0) {
-                return c;
-            }
-            return 0;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java
new file mode 100644
index 0000000..7570b0b
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Sort.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.examples.tpch.client;
+
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.orderParserFactories;
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.ordersDesc;
+import static edu.uci.ics.hyracks.examples.tpch.client.Common.parseFileSplits;
+
+import java.util.EnumSet;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
+
+public class Sort {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
+        public int port = 1098;
+
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
+
+        @Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false)
+        public int frameLimit = 4;
+
+        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
+        public String inFileOrderSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-membuffer-alg", usage = "bestfit or lastfit (default: lastfit)", required = false)
+        public String memBufferAlg = "lastfit";
+
+        @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
+        public boolean profile = true;
+
+        @Option(name = "-topK", usage = "only output topK for each node. (default: not set)")
+        public int topK = Integer.MAX_VALUE;
+
+        @Option(name = "-heapSort", usage = "using heap sort for topK result. (default: false)")
+        public boolean usingHeapSorter = false;
+    }
+
+    static int[] SortFields = new int[] { 1, 0 };
+    static IBinaryComparatorFactory[] SortFieldsComparatorFactories = new IBinaryComparatorFactory[] {
+            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+    static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = new IBinaryHashFunctionFactory[] {
+            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) };
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        if (args.length == 0) {
+            parser.printUsage(System.err);
+            return;
+        }
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job = createJob(parseFileSplits(options.inFileOrderSplits),
+                parseFileSplits(options.outFileSplits),
+                options.memBufferAlg, options.frameLimit, options.frameSize, options.topK, options.usingHeapSorter);
+
+        long start = System.currentTimeMillis();
+        JobId jobId = hcc.startJob(job,
+                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println("finished in:" + (end - start) + "ms");
+    }
+
+    private static JobSpecification createJob(FileSplit[] ordersSplits, FileSplit[] outputSplit, String memBufferAlg,
+            int frameLimit, int frameSize, int limit, boolean usingHeapSorter) {
+        JobSpecification spec = new JobSpecification();
+
+        spec.setFrameSize(frameSize);
+        IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(orderParserFactories, '|'), ordersDesc);
+        createPartitionConstraint(spec, ordScanner, ordersSplits);
+        AbstractSorterOperatorDescriptor sorter;
+        if (usingHeapSorter && limit < Integer.MAX_VALUE) {
+            sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null,
+                    SortFieldsComparatorFactories, ordersDesc);
+        } else {
+            if (memBufferAlg.equalsIgnoreCase("bestfit")) {
+                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields,
+                        null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
+                        EnumFreeSlotPolicy.SMALLEST_FIT, limit);
+            } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
+                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+                        SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.BIGGEST_FIT,
+                        limit);
+            } else {
+                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+                        SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT,
+                        limit);
+
+            }
+        }
+        createPartitionConstraint(spec, sorter, ordersSplits);
+        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(outputSplit);
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
+        createPartitionConstraint(spec, printer, outputSplit);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+        spec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(SortFields, orderBinaryHashFunctionFactories),
+                        SortFields, SortFieldsComparatorFactories, new UTF8StringNormalizedKeyComputerFactory()),
+                sorter, 0, printer, 0);
+
+        spec.addRoot(printer);
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 4e48e9b..cb1ca87 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -76,7 +76,7 @@ public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorD
 
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
-            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+            private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
             private ClassLoader ctxCL;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index 92cde9d..62cd76a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -15,12 +15,11 @@
 
 package edu.uci.ics.hyracks.hdfs.lib;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -37,9 +36,7 @@ public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWri
             throws HyracksDataException {
 
         final ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        final ByteBuffer buffer = ctx.allocateFrame();
-        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-        appender.reset(buffer, true);
+        final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
 
         return new IKeyValueParser<LongWritable, Text>() {
 
@@ -53,18 +50,13 @@ public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWri
                     throws HyracksDataException {
                 tb.reset();
                 tb.addField(value.getBytes(), 0, value.getLength());
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    FrameUtils.flushFrame(buffer, writer);
-                    appender.reset(buffer, true);
-                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new HyracksDataException("tuple cannot be appended into the frame");
-                    }
-                }
+                FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                        tb.getSize());
             }
 
             @Override
             public void close(IFrameWriter writer) throws HyracksDataException {
-                FrameUtils.flushFrame(buffer, writer);
+                appender.flush(writer, false);
             }
 
         };

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 068cdfc..0d24ee5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -75,7 +75,7 @@ public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorD
 
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
-            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+            private FrameTupleAccessor accessor = new FrameTupleAccessor(inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
             private ClassLoader ctxCL;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
index e60026e..785258f 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
@@ -17,7 +17,6 @@ package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
@@ -64,14 +63,8 @@ public class BTreeUpdateSearchOperatorNodePushable extends BTreeSearchOperatorNo
                 dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
                 tb.addFieldEndOffset();
             }
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-                appender.reset(writeBuffer, true);
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                    tb.getSize());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 9f3c5dd..9aec4cb 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -66,7 +66,7 @@ public class IndexBulkLoadOperatorNodePushable extends
     public void open() throws HyracksDataException {
         RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
                 opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(recDesc);
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 092fada..48a395b 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -16,6 +16,8 @@ package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -45,7 +47,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
     protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
     protected FrameTupleAccessor accessor;
     protected FrameTupleReference frameTuple;
-    protected ByteBuffer writeBuffer;
+    protected IFrame writeBuffer;
     protected IIndexAccessor indexAccessor;
     protected ITupleFilter tupleFilter;
     protected IModificationOperationCallback modCallback;
@@ -63,8 +65,8 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
     @Override
     public void open() throws HyracksDataException {
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
-        writeBuffer = ctx.allocateFrame();
+        accessor = new FrameTupleAccessor(inputRecDesc);
+        writeBuffer = new VSizeFrame(ctx);
         writer.open();
         indexHelper.open();
         IIndex index = indexHelper.getIndexInstance();
@@ -134,8 +136,9 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
             }
         }
         // Pass a copy of the frame to next op.
-        System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
-        FrameUtils.flushFrame(writeBuffer, writer);
+        writeBuffer.ensureFrameSize(buffer.capacity());
+        FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+        FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index a3f4e6f..fd727cf 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -18,6 +18,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -45,7 +46,6 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
     protected final IIndexDataflowHelper indexHelper;
     protected FrameTupleAccessor accessor;
 
-    protected ByteBuffer writeBuffer;
     protected FrameTupleAppender appender;
     protected ArrayTupleBuilder tb;
     protected DataOutput dos;
@@ -103,7 +103,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
 
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+        accessor = new FrameTupleAccessor(inputRecDesc);
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
@@ -126,11 +126,9 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
 
         try {
             searchPred = createSearchPredicate();
-            writeBuffer = ctx.allocateFrame();
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(ctx.getFrameSize());
-            appender.reset(writeBuffer, true);
+            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx);
             indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
@@ -162,27 +160,13 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
                 dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
                 tb.addFieldEndOffset();
             }
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-                appender.reset(writeBuffer, true);
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                    tb.getSize());
         }
 
         if (!matched && retainInput && retainNull) {
-            if (!appender.appendConcat(accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(),
-                    nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-                appender.reset(writeBuffer, true);
-                if (!appender.appendConcat(accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(),
-                        nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
-                    throw new HyracksDataException("Record size larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
+            FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex,
+                    nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize());
         }
     }
 
@@ -205,9 +189,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
     @Override
     public void close() throws HyracksDataException {
         try {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
+            appender.flush(writer, true);
             try {
                 cursor.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index cb6270f..0db195d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -15,8 +15,8 @@
 package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
 import java.io.DataOutput;
-import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -60,9 +60,7 @@ public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOut
             try {
                 indexAccessor.diskOrderScan(cursor);
                 int fieldCount = treeIndex.getFieldCount();
-                ByteBuffer frame = ctx.allocateFrame();
-                FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                appender.reset(frame, true);
+                FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
                 ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
                 DataOutput dos = tb.getDataOutput();
 
@@ -72,21 +70,15 @@ public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOut
 
                     ITupleReference frameTuple = cursor.getTuple();
                     for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-                        dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                        dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                                frameTuple.getFieldLength(i));
                         tb.addFieldEndOffset();
                     }
 
-                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        FrameUtils.flushFrame(frame, writer);
-                        appender.reset(frame, true);
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
-                        }
-                    }
-                }
-                if (appender.getTupleCount() > 0) {
-                    FrameUtils.flushFrame(frame, writer);
+                    FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                            tb.getSize());
                 }
+                appender.flush(writer, true);
             } catch (Exception e) {
                 writer.fail();
                 throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 717d326..f340142 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -15,14 +15,13 @@
 package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
 import java.io.DataOutput;
-import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
@@ -70,18 +69,18 @@ public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourc
                     .getInteriorFrameFactory().createFrame(), treeIndex.getFreePageManager().getMetaDataFrameFactory()
                     .createFrame());
             // Write the stats output as a single string field.
-            ByteBuffer frame = ctx.allocateFrame();
-            FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-            appender.reset(frame, true);
+            FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
             DataOutput dos = tb.getDataOutput();
             tb.reset();
             UTF8StringSerializerDeserializer.INSTANCE.serialize(stats.toString(), dos);
             tb.addFieldEndOffset();
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+                throw new HyracksDataException(
+                        "Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity()
+                                + ")");
             }
-            FrameUtils.flushFrame(frame, writer);
+            appender.flush(writer, false);
         } catch (Exception e) {
             writer.fail();
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 5bf52e4..fd693cf 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -30,6 +30,12 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
 
     protected FrameTupleAppender appender;
 
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        appender = new FrameTupleAppender(writeBuffer);
+    }
+
     public LSMIndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
@@ -85,8 +91,8 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
                         break;
                     }
                     default: {
-                        throw new HyracksDataException("Unsupported operation " + op
-                                + " in tree index InsertUpdateDelete operator");
+                        throw new HyracksDataException(
+                                "Unsupported operation " + op + " in tree index InsertUpdateDelete operator");
                     }
                 }
             } catch (HyracksDataException e) {
@@ -97,8 +103,9 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
         }
         if (nextFlushTupleIndex == 0) {
             // No partial flushing was necessary. Forward entire frame.
-            System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
-            FrameUtils.flushFrame(writeBuffer, writer);
+            writeBuffer.ensureFrameSize(buffer.capacity());
+            FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+            FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
         } else {
             // Flush remaining partial frame.
             flushPartialFrame(nextFlushTupleIndex, tupleCount);
@@ -106,17 +113,9 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
     }
 
     private void flushPartialFrame(int startTupleIndex, int endTupleIndex) throws HyracksDataException {
-        if (appender == null) {
-            appender = new FrameTupleAppender(ctx.getFrameSize());
-        }
-        appender.reset(writeBuffer, true);
         for (int i = startTupleIndex; i < endTupleIndex; i++) {
-            if (!appender.append(accessor, i)) {
-                throw new HyracksDataException("Record size ("
-                        + (accessor.getTupleEndOffset(i) - accessor.getTupleStartOffset(i))
-                        + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
-            }
+            FrameUtils.appendToWriter(writer, appender, accessor, i);
         }
-        FrameUtils.flushFrame(writeBuffer, writer);
+        appender.flush(writer, true);
     }
 }


Mime
View raw message