asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [03/85] [abbrv] [partial] incubator-asterixdb-hyracks git commit: Move Pregelix and Hivesterix codebase to new repositories: 1. Move Pregelix codebase to https://github.com/pregelix/pregelix; 2. Move Hivesterix codebase to https://code.google.com/p/hives
Date Fri, 24 Apr 2015 18:45:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/group/IClusteredAggregatorDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/group/IClusteredAggregatorDescriptorFactory.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/group/IClusteredAggregatorDescriptorFactory.java
deleted file mode 100644
index b082cbb..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/group/IClusteredAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.group;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- *
- */
-public interface IClusteredAggregatorDescriptorFactory extends Serializable {
-
-    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
-            IFrameWriter resultWriter, ByteBuffer outputFrame, FrameTupleAppender appender) throws HyracksDataException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunGenerator.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunGenerator.java
deleted file mode 100644
index c1c41d4..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunGenerator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.pregelix.dataflow.std.group.ClusteredGroupWriter;
-import edu.uci.ics.pregelix.dataflow.std.group.IClusteredAggregatorDescriptorFactory;
-
-public class ExternalSortRunGenerator implements IFrameWriter {
-    private final IHyracksTaskContext ctx;
-    private final IFrameSorter frameSorter;
-    private final List<IFrameReader> runs;
-    private final int maxSortFrames;
-
-    private final int[] groupFields;
-    private final IBinaryComparator[] comparators;
-    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
-    private final RecordDescriptor inRecordDesc;
-    private final RecordDescriptor outRecordDesc;
-
-    public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor recordDesc,
-            int framesLimit, int[] groupFields, IBinaryComparator[] comparators,
-            IClusteredAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor outRecordDesc)
-            throws HyracksDataException {
-        this.ctx = ctx;
-        this.frameSorter = new FrameSorterQuickSort(ctx, sortFields, recordDesc);
-        this.runs = new LinkedList<IFrameReader>();
-        this.maxSortFrames = framesLimit - 1;
-
-        this.groupFields = groupFields;
-        this.comparators = comparators;
-        this.aggregatorFactory = aggregatorFactory;
-        this.inRecordDesc = recordDesc;
-        this.outRecordDesc = outRecordDesc;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        runs.clear();
-        frameSorter.reset();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (frameSorter.getFrameCount() >= maxSortFrames) {
-            flushFramesToRun();
-        }
-        frameSorter.insertFrame(buffer);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (frameSorter.getFrameCount() > 0) {
-            if (runs.size() <= 0) {
-                frameSorter.sortFrames();
-            } else {
-                flushFramesToRun();
-            }
-        }
-    }
-
-    private void flushFramesToRun() throws HyracksDataException {
-        frameSorter.sortFrames();
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortRunGenerator.class.getSimpleName());
-        RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
-        ClusteredGroupWriter pgw = new ClusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
-                this.inRecordDesc, this.outRecordDesc, writer);
-        pgw.open();
-
-        try {
-            frameSorter.flushFrames(pgw);
-        } finally {
-            pgw.close();
-        }
-        frameSorter.reset();
-        runs.add(writer.createReader());
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-    }
-
-    public IFrameSorter getFrameSorter() {
-        return frameSorter;
-    }
-
-    public List<IFrameReader> getRuns() {
-        return runs;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunMerger.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunMerger.java
deleted file mode 100644
index ff73ced..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ExternalSortRunMerger.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.pregelix.dataflow.std.group.ClusteredGroupWriter;
-import edu.uci.ics.pregelix.dataflow.std.group.IClusteredAggregatorDescriptorFactory;
-
-/**
- * Group-by aggregation is pushed into multi-pass merge of external sort.
- * 
- * @author yingyib
- */
-public class ExternalSortRunMerger {
-
-    private final IHyracksTaskContext ctx;
-    private final List<IFrameReader> runs;
-    private final int[] sortFields;
-    private final RecordDescriptor inRecordDesc;
-    private final RecordDescriptor outRecordDesc;
-    private final int framesLimit;
-    private final IFrameWriter writer;
-    private List<ByteBuffer> inFrames;
-    private ByteBuffer outFrame;
-    private FrameTupleAppender outFrameAppender;
-
-    private IFrameSorter frameSorter; // Used in External sort, no replacement
-                                      // selection
-
-    private final int[] groupFields;
-    private final IBinaryComparator[] comparators;
-    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
-    private final IClusteredAggregatorDescriptorFactory partialAggregatorFactory;
-    private final boolean localSide;
-
-    // Constructor for external sort, no replacement selection
-    public ExternalSortRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
-            int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc, int framesLimit,
-            IFrameWriter writer, int[] groupFields, IBinaryComparator[] comparators,
-            IClusteredAggregatorDescriptorFactory partialAggregatorFactory,
-            IClusteredAggregatorDescriptorFactory aggregatorFactory, boolean localSide) {
-        this.ctx = ctx;
-        this.frameSorter = frameSorter;
-        this.runs = new LinkedList<IFrameReader>(runs);
-        this.sortFields = sortFields;
-        this.inRecordDesc = inRecordDesc;
-        this.outRecordDesc = outRecordDesc;
-        this.framesLimit = framesLimit;
-        this.writer = writer;
-
-        this.groupFields = groupFields;
-        this.comparators = comparators;
-        this.aggregatorFactory = aggregatorFactory;
-        this.partialAggregatorFactory = partialAggregatorFactory;
-        this.localSide = localSide;
-    }
-
-    public void process() throws HyracksDataException {
-        ClusteredGroupWriter pgw = new ClusteredGroupWriter(ctx, groupFields, comparators,
-                localSide ? partialAggregatorFactory : aggregatorFactory, inRecordDesc, outRecordDesc, writer);
-        try {
-            if (runs.size() <= 0) {
-                pgw.open();
-                if (frameSorter != null && frameSorter.getFrameCount() > 0) {
-                    frameSorter.flushFrames(pgw);
-                }
-                /** recycle sort buffer */
-                frameSorter.close();
-            } else {
-                /** recycle sort buffer */
-                frameSorter.close();
-
-                inFrames = new ArrayList<ByteBuffer>();
-                outFrame = ctx.allocateFrame();
-                outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-                outFrameAppender.reset(outFrame, true);
-                for (int i = 0; i < framesLimit - 1; ++i) {
-                    inFrames.add(ctx.allocateFrame());
-                }
-                int maxMergeWidth = framesLimit - 1;
-                while (runs.size() > maxMergeWidth) {
-                    int generationSeparator = 0;
-                    while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
-                        int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
-                                runs.size() - maxMergeWidth + 1);
-                        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
-                                .getSimpleName());
-                        IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
-                        pgw = new ClusteredGroupWriter(ctx, groupFields, comparators, partialAggregatorFactory,
-                                inRecordDesc, inRecordDesc, mergeResultWriter);
-                        pgw.open();
-                        IFrameReader[] runCursors = new RunFileReader[mergeWidth];
-                        for (int i = 0; i < mergeWidth; i++) {
-                            runCursors[i] = runs.get(generationSeparator + i);
-                        }
-                        merge(pgw, runCursors);
-                        pgw.close();
-                        runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
-                        runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
-                    }
-                }
-                if (!runs.isEmpty()) {
-                    pgw = new ClusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
-                            inRecordDesc, writer);
-                    pgw.open();
-                    IFrameReader[] runCursors = new RunFileReader[runs.size()];
-                    for (int i = 0; i < runCursors.length; i++) {
-                        runCursors[i] = runs.get(i);
-                    }
-                    merge(pgw, runCursors);
-                }
-            }
-        } catch (Exception e) {
-            pgw.fail();
-            throw new HyracksDataException(e);
-        } finally {
-            pgw.close();
-        }
-    }
-
-    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
-        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, inRecordDesc);
-        merger.open();
-        try {
-            while (merger.nextFrame(outFrame)) {
-                FrameUtils.flushFrame(outFrame, mergeResultWriter);
-            }
-        } finally {
-            merger.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FastSortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FastSortOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FastSortOperatorDescriptor.java
deleted file mode 100644
index 85bc149..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FastSortOperatorDescriptor.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.pregelix.dataflow.std.group.IClusteredAggregatorDescriptorFactory;
-
-public class FastSortOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private static final int SORT_ACTIVITY_ID = 0;
-    private static final int MERGE_ACTIVITY_ID = 1;
-
-    private final int[] sortFields;
-    private final int framesLimit;
-
-    private final int[] groupFields;
-    private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
-    private final IClusteredAggregatorDescriptorFactory partialAggregatorFactory;
-    private final RecordDescriptor combinedRecordDesc;
-    private final RecordDescriptor outputRecordDesc;
-    private final boolean localSide;
-
-    public FastSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            RecordDescriptor recordDescriptor, int[] groupFields,
-            IClusteredAggregatorDescriptorFactory partialAggregatorFactory,
-            IClusteredAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor combinedRecordDesc,
-            RecordDescriptor outRecordDesc, boolean localSide) {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        this.sortFields = sortFields;
-        if (framesLimit <= 1) {
-            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
-        }
-        this.recordDescriptors[0] = recordDescriptor;
-
-        this.groupFields = groupFields;
-        this.aggregatorFactory = aggregatorFactory;
-        this.partialAggregatorFactory = partialAggregatorFactory;
-        this.combinedRecordDesc = combinedRecordDesc;
-        this.outputRecordDesc = outRecordDesc;
-        this.localSide = localSide;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
-        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
-        builder.addActivity(this, sa);
-        builder.addSourceEdge(0, sa, 0);
-
-        builder.addActivity(this, ma);
-        builder.addTargetEdge(0, ma, 0);
-
-        builder.addBlockingEdge(sa, ma);
-    }
-
-    public static class SortTaskState extends AbstractStateObject {
-        private List<IFrameReader> runs;
-        private IFrameSorter frameSorter;
-
-        public SortTaskState() {
-        }
-
-        private SortTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-    }
-
-    private class SortActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public SortActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private ExternalSortRunGenerator runGen;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    runGen = new ExternalSortRunGenerator(ctx, sortFields, recordDescriptors[0], framesLimit,
-                            groupFields, new IBinaryComparator[] { new RawBinaryComparator() },
-                            partialAggregatorFactory, combinedRecordDesc);
-                    runGen.open();
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    runGen.nextFrame(buffer);
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
-                            getActivityId(), partition));
-                    runGen.close();
-                    state.runs = runGen.getRuns();
-                    state.frameSorter = runGen.getFrameSorter();
-                    ctx.setStateObject(state);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    runGen.fail();
-                }
-            };
-            return op;
-        }
-    }
-
-    private class MergeActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public MergeActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                @Override
-                public void initialize() throws HyracksDataException {
-                    SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            SORT_ACTIVITY_ID), partition));
-                    List<IFrameReader> runs = state.runs;
-                    IFrameSorter frameSorter = state.frameSorter;
-                    int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
-                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
-                            combinedRecordDesc, outputRecordDesc, necessaryFrames, writer, groupFields,
-                            new IBinaryComparator[] { new RawBinaryComparator() }, partialAggregatorFactory,
-                            aggregatorFactory, localSide);
-                    merger.process();
-                }
-            };
-            return op;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FrameSorterQuickSort.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FrameSorterQuickSort.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FrameSorterQuickSort.java
deleted file mode 100644
index d50e708..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/FrameSorterQuickSort.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
-
-public class FrameSorterQuickSort implements IFrameSorter {
-    private final IHyracksTaskContext ctx;
-    private final int[] sortFields;
-    private final List<ByteBuffer> buffers;
-
-    private final FrameTupleAccessor fta1;
-    private final FrameTupleAccessor fta2;
-
-    private final FrameTupleAppender appender;
-
-    private final ByteBuffer outFrame;
-
-    private int dataFrameCount;
-    private int[] tPointers;
-    private int tupleCount;
-
-    private final RawBinaryComparator[] comparators = new RawBinaryComparator[] { new RawBinaryComparator() };
-    private final RawNormalizedKeyComputer nkc = new RawNormalizedKeyComputer();
-
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor recordDescriptor)
-            throws HyracksDataException {
-        this.ctx = ctx;
-        this.sortFields = sortFields;
-        buffers = new ArrayList<ByteBuffer>();
-        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        outFrame = ctx.allocateFrame();
-
-        dataFrameCount = 0;
-    }
-
-    @Override
-    public void reset() {
-        dataFrameCount = 0;
-        tupleCount = 0;
-    }
-
-    @Override
-    public int getFrameCount() {
-        return dataFrameCount;
-    }
-
-    @Override
-    public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer copyFrame;
-        if (dataFrameCount == buffers.size()) {
-            copyFrame = ctx.allocateFrame();
-            buffers.add(copyFrame);
-        } else {
-            copyFrame = buffers.get(dataFrameCount);
-        }
-        FrameUtils.copy(buffer, copyFrame);
-        ++dataFrameCount;
-    }
-
-    @Override
-    public void sortFrames() {
-        int nBuffers = dataFrameCount;
-        tupleCount = 0;
-        for (int i = 0; i < nBuffers; ++i) {
-            fta1.reset(buffers.get(i));
-            tupleCount += fta1.getTupleCount();
-        }
-        int sfIdx = sortFields[0];
-        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
-        int ptr = 0;
-        for (int i = 0; i < nBuffers; ++i) {
-            fta1.reset(buffers.get(i));
-            int tCount = fta1.getTupleCount();
-            byte[] array = fta1.getBuffer().array();
-            for (int j = 0; j < tCount; ++j) {
-                int tStart = fta1.getTupleStartOffset(j);
-                int tEnd = fta1.getTupleEndOffset(j);
-                tPointers[ptr * 4] = i << 16;
-                tPointers[ptr * 4 + 1] = tStart;
-                tPointers[ptr * 4 + 2] = tEnd;
-                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
-                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
-                tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
-                tPointers[ptr * 4] |= nkc == null ? 0 : (nkc.normalize2(array, f0Start, f0EndRel - f0StartRel) & 0xff);
-                ++ptr;
-            }
-        }
-        if (tupleCount > 0) {
-            sort(tPointers, 0, tupleCount);
-        }
-    }
-
-    @Override
-    public void flushFrames(IFrameWriter writer) throws HyracksDataException {
-        appender.reset(outFrame, true);
-        for (int ptr = 0; ptr < tupleCount; ++ptr) {
-            int i = tPointers[ptr * 4] >>> 16;
-            int tStart = tPointers[ptr * 4 + 1];
-            int tEnd = tPointers[ptr * 4 + 2];
-            ByteBuffer buffer = buffers.get(i);
-            fta1.reset(buffer);
-            if (!appender.append(fta1, tStart, tEnd)) {
-                FrameUtils.flushFrame(outFrame, writer);
-                appender.reset(outFrame, true);
-                if (!appender.append(fta1, tStart, tEnd)) {
-                    throw new HyracksDataException("Record size (" + (tEnd - tStart) + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
-        }
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(outFrame, writer);
-        }
-    }
-
-    private void sort(int[] tPointers, int offset, int length) {
-        int m = offset + (length >> 1);
-        int mi = tPointers[m * 4] >>> 16;
-        int mu = tPointers[m * 4] & 0xff;
-        int mj = tPointers[m * 4 + 1];
-        int mv = tPointers[m * 4 + 3];
-
-        int a = offset;
-        int b = a;
-        int c = offset + length - 1;
-        int d = c;
-        while (true) {
-            while (b <= c) {
-                int cmp = compare(tPointers, b, mi, mj, mv, mu);
-                if (cmp > 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(tPointers, a++, b);
-                }
-                ++b;
-            }
-            while (c >= b) {
-                int cmp = compare(tPointers, c, mi, mj, mv, mu);
-                if (cmp < 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(tPointers, c, d--);
-                }
-                --c;
-            }
-            if (b > c)
-                break;
-            swap(tPointers, b++, c--);
-        }
-
-        int s;
-        int n = offset + length;
-        s = Math.min(a - offset, b - a);
-        vecswap(tPointers, offset, b - s, s);
-        s = Math.min(d - c, n - d - 1);
-        vecswap(tPointers, b, n - s, s);
-
-        if ((s = b - a) > 1) {
-            sort(tPointers, offset, s);
-        }
-        if ((s = d - c) > 1) {
-            sort(tPointers, n - s, s);
-        }
-    }
-
-    private void swap(int x[], int a, int b) {
-        for (int i = 0; i < 4; ++i) {
-            int t = x[a * 4 + i];
-            x[a * 4 + i] = x[b * 4 + i];
-            x[b * 4 + i] = t;
-        }
-    }
-
-    private void vecswap(int x[], int a, int b, int n) {
-        for (int i = 0; i < n; i++, a++, b++) {
-            swap(x, a, b);
-        }
-    }
-
-    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v, int tp2u) {
-        int v1 = tPointers[tp1 * 4 + 3];
-        if (v1 != tp2v) {
-            return v1 < tp2v ? -1 : 1;
-        }
-        int u1 = tPointers[tp1 * 4] & 0xff;
-        if (u1 != tp2u) {
-            return u1 < tp2u ? -1 : 1;
-        }
-        int i1 = tPointers[tp1 * 4] >>> 16;
-        int j1 = tPointers[tp1 * 4 + 1];
-        int i2 = tp2i;
-        int j2 = tp2j;
-        ByteBuffer buf1 = buffers.get(i1);
-        ByteBuffer buf2 = buffers.get(i2);
-        byte[] b1 = buf1.array();
-        byte[] b2 = buf2.array();
-        fta1.reset(buf1);
-        fta2.reset(buf2);
-        for (int f = 0; f < comparators.length; ++f) {
-            int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b1, j1 + (fIdx - 1) * 4);
-            int f1End = IntSerDeUtils.getInt(b1, j1 + fIdx * 4);
-            int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
-            int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b2, j2 + (fIdx - 1) * 4);
-            int f2End = IntSerDeUtils.getInt(b2, j2 + fIdx * 4);
-            int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
-            int l2 = f2End - f2Start;
-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    @Override
-    public void close() {
-        this.buffers.clear();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IFrameSorter.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IFrameSorter.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IFrameSorter.java
deleted file mode 100644
index de16aca..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IFrameSorter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFrameSorter {
-
-    public void reset();
-
-    public int getFrameCount();
-
-    public void insertFrame(ByteBuffer buffer) throws HyracksDataException;
-
-    public void sortFrames();
-
-    public void flushFrames(IFrameWriter writer) throws HyracksDataException;
-
-    public void close();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IRunGenerator.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IRunGenerator.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IRunGenerator.java
deleted file mode 100644
index c193a2d..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/IRunGenerator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-
-/**
- * @author pouria
- *         Interface for the Run Generator
- */
-public interface IRunGenerator extends IFrameWriter {
-
-    /**
-     * @return the list of generated (sorted) runs
-     */
-    public List<IFrameReader> getRuns();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawBinaryComparator.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawBinaryComparator.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawBinaryComparator.java
deleted file mode 100644
index d6db3c8..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawBinaryComparator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-
-public final class RawBinaryComparator implements IBinaryComparator {
-
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-        if (b1 == b2 && s1 == s2) {
-            return 0;
-        }
-        int commonLength = Math.min(l1, l2);
-        for (int i = 0; i < commonLength; i++) {
-            if (b1[s1 + i] != b2[s2 + i]) {
-                return (b1[s1 + i] & 0xff) - (b2[s2 + i] & 0xff);
-            }
-        }
-        int difference = l1 - l2;
-        return difference == 0 ? 0 : (difference > 0 ? 1 : -1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawNormalizedKeyComputer.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawNormalizedKeyComputer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawNormalizedKeyComputer.java
deleted file mode 100644
index f43b499..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RawNormalizedKeyComputer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-public final class RawNormalizedKeyComputer {
-
-    public int normalize(byte[] bytes, int start, int length) {
-        int nk = 0;
-        for (int i = 0; i < 4; i++) {
-            nk <<= 8;
-            if (i < length) {
-                nk += (bytes[start + i] & 0xff);
-            }
-        }
-        return nk ^ Integer.MIN_VALUE;
-    }
-
-    public int normalize2(byte[] bytes, int start, int length) {
-        int nk = 0;
-        for (int i = 4; i < 6; i++) {
-            nk <<= 8;
-            if (i < length) {
-                nk += (bytes[start + i] & 0xff);
-            }
-        }
-        return nk;
-    }
-
-    public int normalize4(byte[] bytes, int start, int length) {
-        int nk = 0;
-        for (int i = 4; i < 8; i++) {
-            nk <<= 8;
-            if (i < length) {
-                nk += (bytes[start + i] & 0xff);
-            }
-        }
-        return nk ^ Integer.MIN_VALUE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ReferencedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ReferencedPriorityQueue.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ReferencedPriorityQueue.java
deleted file mode 100644
index 7f2db55..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/ReferencedPriorityQueue.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.pregelix.dataflow.std.sort.RunMergingFrameReader.EntryComparator;
-import edu.uci.ics.pregelix.dataflow.std.util.ReferenceEntry;
-
-public class ReferencedPriorityQueue {
-    private final int frameSize;
-    private final RecordDescriptor recordDescriptor;
-    private final ReferenceEntry entries[];
-    private final int size;
-    private int nItems;
-
-    private final EntryComparator comparator;
-    private final RawNormalizedKeyComputer nmkComputer = new RawNormalizedKeyComputer();
-    private final int[] keyFields;
-
-    public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
-            EntryComparator comparator, int[] keyFields) {
-        this.frameSize = frameSize;
-        this.recordDescriptor = recordDescriptor;
-        if (initSize < 1)
-            throw new IllegalArgumentException();
-        this.comparator = comparator;
-        this.keyFields = keyFields;
-        nItems = initSize;
-        size = (initSize + 1) & 0xfffffffe;
-        entries = new ReferenceEntry[size];
-        for (int i = 0; i < size; i++) {
-            entries[i] = new ReferenceEntry(i, null, -1, keyFields, nmkComputer);
-        }
-        for (int i = initSize; i < size; i++) {
-            entries[i].setExhausted();
-        }
-    }
-
-    /**
-     * Retrieve the top entry without removing it
-     * 
-     * @return the top entry
-     */
-    public ReferenceEntry peek() {
-        return entries[0];
-    }
-
-    /**
-     * compare the new entry with entries within the queue, to find a spot for
-     * this new entry
-     * 
-     * @param entry
-     * @return runid of this entry
-     * @throws IOException
-     */
-    public int popAndReplace(FrameTupleAccessor fta, int tIndex) {
-        ReferenceEntry entry = entries[0];
-        if (entry.getAccessor() == null) {
-            entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
-        }
-        entry.getAccessor().reset(fta.getBuffer());
-        entry.setTupleIndex(tIndex, keyFields, nmkComputer);
-
-        add(entry);
-        return entry.getRunid();
-    }
-
-    /**
-     * Push entry into priority queue
-     * 
-     * @param e
-     *            the new Entry
-     */
-    private void add(ReferenceEntry e) {
-        ReferenceEntry min = entries[0];
-        int slot = (size >> 1) + (min.getRunid() >> 1);
-
-        ReferenceEntry curr = e;
-        while (nItems > 0 && slot > 0) {
-            int c = 0;
-            if (entries[slot].isExhausted()) {
-                // run of entries[slot] is exhausted, i.e. not available, curr
-                // wins
-                c = 1;
-            } else if (entries[slot].getAccessor() != null /*
-                                                            * entries[slot] is
-                                                            * not MIN value
-                                                            */
-                    && !curr.isExhausted() /* curr run is available */) {
-
-                if (curr.getAccessor() != null) {
-                    c = comparator.compare(entries[slot], curr);
-                } else {
-                    // curr is MIN value, wins
-                    c = 1;
-                }
-            }
-
-            if (c <= 0) { // curr lost
-                // entries[slot] swaps up
-                ReferenceEntry tmp = entries[slot];
-                entries[slot] = curr;
-                curr = tmp;// winner to pass up
-            }// else curr wins
-            slot = slot >> 1;
-        }
-        // set new entries[0]
-        entries[0] = curr;
-    }
-
-    /**
-     * Pop is called only when a run is exhausted
-     * 
-     * @return
-     */
-    public ReferenceEntry pop() {
-        ReferenceEntry min = entries[0];
-        min.setExhausted();
-        add(min);
-        nItems--;
-        return min;
-    }
-
-    public boolean areRunsExhausted() {
-        return nItems <= 0;
-    }
-
-    public int size() {
-        return nItems;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RunMergingFrameReader.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RunMergingFrameReader.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RunMergingFrameReader.java
deleted file mode 100644
index c8dea63..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/sort/RunMergingFrameReader.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.pregelix.dataflow.std.util.ReferenceEntry;
-
-public class RunMergingFrameReader implements IFrameReader {
-    private final IHyracksTaskContext ctx;
-    private final IFrameReader[] runCursors;
-    private final List<ByteBuffer> inFrames;
-    private final int[] sortFields;
-    private final RawBinaryComparator[] comparators = new RawBinaryComparator[] { new RawBinaryComparator() };
-    private final RecordDescriptor recordDesc;
-    private final FrameTupleAppender outFrameAppender;
-    private ReferencedPriorityQueue topTuples;
-    private int[] tupleIndexes;
-    private FrameTupleAccessor[] tupleAccessors;
-
-    public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
-            int[] sortFields, RecordDescriptor recordDesc) {
-        this.ctx = ctx;
-        this.runCursors = runCursors;
-        this.inFrames = inFrames;
-        this.sortFields = sortFields;
-        this.recordDesc = recordDesc;
-        outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        tupleAccessors = new FrameTupleAccessor[runCursors.length];
-        EntryComparator comparator = createEntryComparator(comparators);
-        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator,
-                sortFields);
-        tupleIndexes = new int[runCursors.length];
-        for (int i = 0; i < runCursors.length; i++) {
-            tupleIndexes[i] = 0;
-            int runIndex = topTuples.peek().getRunid();
-            runCursors[runIndex].open();
-            if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
-                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
-                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
-                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-            } else {
-                closeRun(runIndex, runCursors, tupleAccessors);
-                topTuples.pop();
-            }
-        }
-    }
-
-    @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        outFrameAppender.reset(buffer, true);
-        while (!topTuples.areRunsExhausted()) {
-            ReferenceEntry top = topTuples.peek();
-            int runIndex = top.getRunid();
-            FrameTupleAccessor fta = top.getAccessor();
-            int tupleIndex = top.getTupleIndex();
-
-            if (!outFrameAppender.append(fta, tupleIndex)) {
-                return true;
-            }
-
-            ++tupleIndexes[runIndex];
-            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-        }
-
-        if (outFrameAppender.getTupleCount() > 0) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        for (int i = 0; i < runCursors.length; ++i) {
-            closeRun(i, runCursors, tupleAccessors);
-        }
-    }
-
-    private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
-        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-        if (exists) {
-            topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
-        } else {
-            topTuples.pop();
-            closeRun(runIndex, runCursors, tupleAccessors);
-        }
-    }
-
-    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
-        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
-            return false;
-        } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
-            ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
-            if (runCursors[runIndex].nextFrame(buf)) {
-                tupleIndexes[runIndex] = 0;
-                return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-            } else {
-                return false;
-            }
-        } else {
-            return true;
-        }
-    }
-
-    private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
-            throws HyracksDataException {
-        if (runCursors[index] != null) {
-            runCursors[index].close();
-            runCursors[index] = null;
-            tupleAccessors[index] = null;
-        }
-    }
-
-    private EntryComparator createEntryComparator(final RawBinaryComparator[] comparators) {
-        return new EntryComparator();
-    }
-
-    class EntryComparator implements Comparator<ReferenceEntry> {
-
-        @Override
-        public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
-            int nmk1 = tp1.getNormalizedKey();
-            int nmk2 = tp2.getNormalizedKey();
-            if (nmk1 != nmk2) {
-                return nmk1 > nmk2 ? 1 : -1;
-            }
-            int nmk3 = tp1.getNormalizedKey4();
-            int nmk4 = tp2.getNormalizedKey4();
-            if (nmk3 != nmk4) {
-                return nmk3 > nmk4 ? 1 : -1;
-            }
-
-            FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
-            FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-            byte[] b1 = fta1.getBuffer().array();
-            byte[] b2 = fta2.getBuffer().array();
-            int[] tPointers1 = tp1.getTPointers();
-            int[] tPointers2 = tp2.getTPointers();
-
-            for (int f = 0; f < sortFields.length; ++f) {
-                int c = comparators[f].compare(b1, tPointers1[2 * f + 2], tPointers1[2 * f + 3], b2,
-                        tPointers2[2 * f + 2], tPointers2[2 * f + 3]);
-                if (c != 0) {
-                    return c;
-                }
-            }
-            return 0;
-        }
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/CopyUpdateUtil.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/CopyUpdateUtil.java
deleted file mode 100644
index be2255f..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/CopyUpdateUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-
-public class CopyUpdateUtil {
-
-    public static void copyUpdate(SearchKeyTupleReference tempTupleReference, ITupleReference frameTuple,
-            UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, IIndexAccessor indexAccessor,
-            IIndexCursor cursor, RangePredicate rangePred, boolean scan, StorageType type) throws HyracksDataException,
-            IndexException {
-        if (cloneUpdateTb.getSize() > 0) {
-            if (!updateBuffer.appendTuple(cloneUpdateTb)) {
-                tempTupleReference.reset(frameTuple.getFieldData(0), frameTuple.getFieldStart(0),
-                        frameTuple.getFieldLength(0));
-                //release the cursor/latch
-                cursor.close();
-                //batch update
-                updateBuffer.updateIndex(indexAccessor);
-                //try append the to-be-updated tuple again
-                if (!updateBuffer.appendTuple(cloneUpdateTb)) {
-                    throw new HyracksDataException("cannot append tuple builder!");
-                }
-                //search again and recover the cursor to the exact point as the one before it is closed
-                cursor.reset();
-                rangePred.setLowKey(tempTupleReference, true);
-                if (scan) {
-                    rangePred.setHighKey(null, true);
-                } else {
-                    rangePred.setHighKey(tempTupleReference, true);
-                }
-                indexAccessor.search(cursor, rangePred);
-                if (cursor.hasNext()) {
-                    cursor.next();
-                }
-            }
-            cloneUpdateTb.reset();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/FunctionProxy.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/FunctionProxy.java
deleted file mode 100644
index ee9639a..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/FunctionProxy.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
-import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-
-public class FunctionProxy {
-
-    private final IUpdateFunction function;
-    private final IRuntimeHookFactory preHookFactory;
-    private final IRuntimeHookFactory postHookFactory;
-    private final IRecordDescriptorFactory inputRdFactory;
-    private final IHyracksTaskContext ctx;
-    private final IFrameWriter[] writers;
-    private TupleDeserializer tupleDe;
-    private RecordDescriptor inputRd;
-    private ClassLoader ctxCL;
-    private boolean initialized = false;
-
-    public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
-            IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
-            IRecordDescriptorFactory inputRdFactory, IFrameWriter[] writers) {
-        this.function = functionFactory.createFunction();
-        this.preHookFactory = preHookFactory;
-        this.postHookFactory = postHookFactory;
-        this.inputRdFactory = inputRdFactory;
-        this.writers = writers;
-        this.ctx = ctx;
-    }
-
-    /**
-     * Initialize the function
-     * 
-     * @throws HyracksDataException
-     */
-    public void functionOpen() throws HyracksDataException {
-        ctxCL = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
-        for (IFrameWriter writer : writers) {
-            writer.open();
-        }
-
-    }
-
-    private void init() throws HyracksDataException {
-        inputRd = inputRdFactory.createRecordDescriptor(ctx);
-        tupleDe = new TupleDeserializer(inputRd);
-        if (preHookFactory != null)
-            preHookFactory.createRuntimeHook().configure(ctx);
-        function.open(ctx, inputRd, writers);
-    }
-
-    /**
-     * Call the function
-     * 
-     * @param leftAccessor
-     *            input page accessor
-     * @param leftTupleIndex
-     *            the tuple index in the page
-     * @param updateRef
-     *            update pointer
-     * @throws HyracksDataException
-     */
-    public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
-            ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor) throws HyracksDataException {
-        if (!initialized) {
-            init();
-            initialized = true;
-        }
-        Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
-        function.process(tuple);
-        function.update(right, cloneUpdateTb, cursor);
-    }
-
-    /**
-     * call function, without the newly generated tuple, just the tuple in btree
-     * 
-     * @param updateRef
-     * @throws HyracksDataException
-     */
-    public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
-            throws HyracksDataException {
-        if (!initialized) {
-            init();
-            initialized = true;
-        }
-        Object[] tuple = tupleDe.deserializeRecord(updateRef);
-        function.process(tuple);
-        function.update(updateRef, cloneUpdateTb, cursor);
-    }
-
-    /**
-     * Call the function
-     * 
-     * @param tb
-     *            input data
-     * @param inPlaceUpdateRef
-     *            update pointer
-     * @throws HyracksDataException
-     */
-    public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb,
-            IIndexCursor cursor, boolean nullLeft) throws HyracksDataException {
-        if (!initialized) {
-            init();
-            initialized = true;
-        }
-        Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef, nullLeft);
-        if (tuple[1] == null) {
-            /** skip vertice that should not be invoked */
-            return;
-        }
-        function.process(tuple);
-        function.update(inPlaceUpdateRef, cloneUpdateTb, cursor);
-    }
-
-    /**
-     * Close the function
-     * 
-     * @throws HyracksDataException
-     */
-    public void functionClose() throws HyracksDataException {
-        if (initialized) {
-            if (postHookFactory != null)
-                postHookFactory.createRuntimeHook().configure(ctx);
-            function.close();
-        }
-        for (IFrameWriter writer : writers) {
-            writer.close();
-        }
-        Thread.currentThread().setContextClassLoader(ctxCL);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ReferenceEntry.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ReferenceEntry.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ReferenceEntry.java
deleted file mode 100644
index c22dc34..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ReferenceEntry.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.pregelix.dataflow.std.sort.RawNormalizedKeyComputer;
-
-public final class ReferenceEntry {
-    private final int runid;
-    private FrameTupleAccessor acccessor;
-    private int tupleIndex;
-    private int[] tPointers;
-    private boolean exhausted = false;
-
-    public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
-            RawNormalizedKeyComputer nmkComputer) {
-        super();
-        this.runid = runid;
-        this.acccessor = fta;
-        this.tPointers = new int[2 + 2 * keyFields.length];
-        if (fta != null) {
-            initTPointer(fta, tupleIndex, keyFields, nmkComputer);
-        }
-    }
-
-    public int getRunid() {
-        return runid;
-    }
-
-    public FrameTupleAccessor getAccessor() {
-        return acccessor;
-    }
-
-    public void setAccessor(FrameTupleAccessor fta) {
-        this.acccessor = fta;
-    }
-
-    public int[] getTPointers() {
-        return tPointers;
-    }
-
-    public int getTupleIndex() {
-        return tupleIndex;
-    }
-
-    public int getNormalizedKey() {
-        return tPointers[0];
-    }
-
-    public int getNormalizedKey4() {
-        return tPointers[1];
-    }
-
-    public void setTupleIndex(int tupleIndex, int[] keyFields, RawNormalizedKeyComputer nmkComputer) {
-        initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
-    }
-
-    public void setExhausted() {
-        this.exhausted = true;
-    }
-
-    public boolean isExhausted() {
-        return this.exhausted;
-    }
-
-    private void initTPointer(FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
-            RawNormalizedKeyComputer nmkComputer) {
-        this.tupleIndex = tupleIndex;
-        byte[] b1 = fta.getBuffer().array();
-        for (int f = 0; f < keyFields.length; ++f) {
-            int fIdx = keyFields[f];
-            tPointers[2 * f + 2] = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength()
-                    + fta.getFieldStartOffset(tupleIndex, fIdx);
-            tPointers[2 * f + 3] = fta.getFieldEndOffset(tupleIndex, fIdx) - fta.getFieldStartOffset(tupleIndex, fIdx);
-            if (f == 0) {
-                tPointers[0] = nmkComputer == null ? 0 : nmkComputer.normalize(b1, tPointers[2], tPointers[3]);
-                tPointers[1] = nmkComputer == null ? 0 : nmkComputer.normalize4(b1, tPointers[2], tPointers[3]);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayInputStream.java
deleted file mode 100644
index f6ef7af..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayInputStream.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import java.io.InputStream;
-
-public class ResetableByteArrayInputStream extends InputStream {
-
-    private byte[] data;
-    private int position;
-
-    public ResetableByteArrayInputStream() {
-    }
-
-    public void setByteArray(byte[] data, int position) {
-        this.data = data;
-        this.position = position;
-    }
-
-    @Override
-    public int read() {
-        int remaining = data.length - position;
-        int value = remaining > 0 ? (data[position++] & 0xff) : -1;
-        return value;
-    }
-
-    @Override
-    public int read(byte[] bytes, int offset, int length) {
-        int remaining = data.length - position;
-        if (remaining == 0) {
-            return -1;
-        }
-        int l = Math.min(length, remaining);
-        System.arraycopy(data, position, bytes, offset, l);
-        position += l;
-        return l;
-    }
-
-    @Override
-    public int available() {
-        return data.length - position;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayOutputStream.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayOutputStream.java
deleted file mode 100644
index ab43a08..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/ResetableByteArrayOutputStream.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import java.io.OutputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class ResetableByteArrayOutputStream extends OutputStream {
-    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
-
-    private byte[] data;
-    private int position;
-
-    public ResetableByteArrayOutputStream() {
-    }
-
-    public void setByteArray(byte[] data, int position) {
-        this.data = data;
-        this.position = position;
-    }
-
-    @Override
-    public void write(int b) {
-        int remaining = data.length - position;
-        if (position + 1 > data.length - 1)
-            throw new IndexOutOfBoundsException();
-        data[position] = (byte) b;
-        position++;
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
-        }
-    }
-
-    @Override
-    public void write(byte[] bytes, int offset, int length) {
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
-                    + position);
-        }
-        if (position + length > data.length - 1)
-            throw new IndexOutOfBoundsException();
-        System.arraycopy(bytes, offset, data, position, length);
-        position += length;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/SearchKeyTupleReference.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/SearchKeyTupleReference.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/SearchKeyTupleReference.java
deleted file mode 100644
index aaa961e..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/SearchKeyTupleReference.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class SearchKeyTupleReference implements ITupleReference {
-
-    private byte[] copiedData;
-    private int length;
-
-    public void reset(byte[] data, int start, int len) {
-        if (copiedData == null) {
-            copiedData = new byte[len];
-        }
-        if (copiedData.length < len) {
-            copiedData = new byte[len];
-        }
-        System.arraycopy(data, start, copiedData, 0, len);
-        length = len;
-    }
-
-    @Override
-    public int getFieldCount() {
-        return 1;
-    }
-
-    @Override
-    public byte[] getFieldData(int fIdx) {
-        return copiedData;
-    }
-
-    @Override
-    public int getFieldStart(int fIdx) {
-        return 0;
-    }
-
-    @Override
-    public int getFieldLength(int fIdx) {
-        return length;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/StorageType.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/StorageType.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/StorageType.java
deleted file mode 100644
index af50fbe..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/StorageType.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-public enum StorageType {
-    TreeIndex,
-    LSMIndex
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/TupleDeserializer.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/TupleDeserializer.java
deleted file mode 100644
index 458d842..0000000
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/util/TupleDeserializer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.dataflow.std.util;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class TupleDeserializer {
-    private static String ERROR_MSG = "Out-of-bound read in your Writable implementations of types for vertex id, vertex value, edge value or message --- check your readFields and write implmenetation";
-    private final Object[] record;
-    private final RecordDescriptor recordDescriptor;
-    private final ResetableByteArrayInputStream bbis;
-    private final DataInputStream di;
-
-    public TupleDeserializer(RecordDescriptor recordDescriptor) {
-        this.recordDescriptor = recordDescriptor;
-        this.bbis = new ResetableByteArrayInputStream();
-        this.di = new DataInputStream(bbis);
-        this.record = new Object[recordDescriptor.getFields().length];
-    }
-
-    public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
-        try {
-            for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
-                byte[] data = tupleRef.getFieldData(i);
-                int offset = tupleRef.getFieldStart(i);
-                int len = tupleRef.getFieldLength(i);
-                bbis.setByteArray(data, offset);
-
-                int availableBefore = bbis.available();
-                Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                int availableAfter = bbis.available();
-                if (availableBefore - availableAfter > len) {
-                    throw new IllegalStateException(ERROR_MSG);
-                }
-
-                record[i] = instance;
-            }
-            return record;
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
-            throws HyracksDataException {
-        try {
-            /** skip vertex id field in deserialization */
-            byte[] data = left.getBuffer().array();
-            int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
-            int leftFieldCount = left.getFieldCount();
-            int fStart = tStart;
-            for (int i = 1; i < leftFieldCount; ++i) {
-                /**
-                 * reset the input
-                 */
-                fStart = tStart + left.getFieldStartOffset(tIndex, i);
-                int fieldLength = left.getFieldLength(tIndex, i);
-                bbis.setByteArray(data, fStart);
-
-                /**
-                 * do deserialization
-                 */
-                int availableBefore = bbis.available();
-                Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                int availableAfter = bbis.available();
-                if (availableBefore - availableAfter > fieldLength) {
-                    throw new IllegalStateException(ERROR_MSG);
-
-                }
-                record[i] = instance;
-            }
-            /** skip vertex id field in deserialization */
-            for (int i = leftFieldCount + 1; i < record.length; ++i) {
-                byte[] rightData = right.getFieldData(i - leftFieldCount);
-                int rightOffset = right.getFieldStart(i - leftFieldCount);
-                int len = right.getFieldLength(i - leftFieldCount);
-                bbis.setByteArray(rightData, rightOffset);
-
-                int availableBefore = bbis.available();
-                Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                int availableAfter = bbis.available();
-                if (availableBefore - availableAfter > len) {
-                    throw new IllegalStateException(ERROR_MSG);
-                }
-                record[i] = instance;
-            }
-            return record;
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right, boolean nullLeft)
-            throws HyracksDataException {
-        try {
-            if (nullLeft) {
-                byte[] rightData = right.getFieldData(1);
-                int rightFieldOffset = right.getFieldStart(1);
-                /** skip a halted and no message vertex without deserializing it */
-                if ((rightData[rightFieldOffset] & 1) == 1) {
-                    // halt flag is the last bit of the first byte of any vertex value
-                    record[0] = null;
-                    record[1] = null;
-                    return record;
-                }
-            }
-
-            byte[] data = tb.getByteArray();
-            int[] offset = tb.getFieldEndOffsets();
-            int start = 0;
-            /** skip vertex id fields in deserialization */
-            for (int i = 1; i < offset.length; ++i) {
-                /**
-                 * reset the input
-                 */
-                start = offset[i - 1];
-                bbis.setByteArray(data, start);
-                int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1];
-
-                /**
-                 * do deserialization
-                 */
-                int availableBefore = bbis.available();
-                Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                int availableAfter = bbis.available();
-                if (availableBefore - availableAfter > fieldLength) {
-                    throw new IllegalStateException(ERROR_MSG);
-                }
-                record[i] = instance;
-            }
-            /** skip vertex id fields in deserialization */
-            for (int i = offset.length + 1; i < record.length; ++i) {
-                byte[] rightData = right.getFieldData(i - offset.length);
-                int rightOffset = right.getFieldStart(i - offset.length);
-                bbis.setByteArray(rightData, rightOffset);
-                int fieldLength = right.getFieldLength(i - offset.length);
-
-                int availableBefore = bbis.available();
-                Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                int availableAfter = bbis.available();
-                if (availableBefore - availableAfter > fieldLength) {
-                    throw new IllegalStateException(ERROR_MSG);
-                }
-                record[i] = instance;
-            }
-            return record;
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-}


Mime
View raw message