asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [08/85] [abbrv] [partial] incubator-asterixdb-hyracks git commit: Move Pregelix and Hivesterix codebase to new repositories: 1. Move Pregelix codebase to https://github.com/pregelix/pregelix; 2. Move Hivesterix codebase to https://code.google.com/p/hives
Date Fri, 24 Apr 2015 18:45:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
deleted file mode 100644
index 4e8d3a2..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.jobgen;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.VLongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.core.data.TypeTraits;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
-import edu.uci.ics.pregelix.core.util.DataflowUtils;
-import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
-import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
-import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
-
-public class JobGenInnerJoin extends JobGen {
-    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
-
-    public JobGenInnerJoin(PregelixJob job, IOptimizer optimizer) {
-        super(job, optimizer);
-    }
-
-    public JobGenInnerJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
-        super(job, jobId, optimizer);
-    }
-
-    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
-        String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-
-        IConfigurationFactory confFactory = getConfigurationFactory();
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        /**
-         * construct empty tuple operator
-         */
-        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptyTupleSource);
-
-        /** construct runtime hook */
-        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        setLocationConstraint(spec, preSuperStep);
-
-        /**
-         * construct drop index operator
-         */
-        IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
-        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
-
-        /**
-         * construct btree search and function call update operator
-         */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), vertexClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                partialAggregateValueClassNames);
-        IConfigurationFactory configurationFactory = getConfigurationFactory();
-        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                MsgList.class.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
-
-        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
-                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
-        setLocationConstraint(spec, scanner);
-
-        /**
-         * termination state write operator
-         */
-        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
-                configurationFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-
-        /**
-         * final aggregate write operator
-         */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), partialAggregateValueClassNames);
-        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
-                configurationFactory, aggRdFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
-
-        /**
-         * construct bulk-load index operator
-         */
-        int[] fieldPermutation = new int[] { 0, 1 };
-        int[] keyFields = new int[] { 0 };
-        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
-        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
-                WritableComparator.get(vertexIdClass).getClass());
-        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
-                fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
-        setLocationConstraint(spec, btreeBulkLoad);
-
-        /**
-         * construct group-by operator pipeline
-         */
-        Pair<IOperatorDescriptor, IOperatorDescriptor> groupOps = generateGroupingOperators(spec, iteration,
-                vertexIdClass);
-        IOperatorDescriptor groupStartOperator = groupOps.getLeft();
-        IOperatorDescriptor groupEndOperator = groupOps.getRight();
-
-        /**
-         * construct the materializing write operator
-         */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
-                jobId, iteration);
-        setLocationConstraint(spec, materialize);
-
-        /**
-         * do pre- & post- super step
-         */
-        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PostSuperStepRuntimeHookFactory(jobId));
-        setLocationConstraint(spec, postSuperStep);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink);
-
-        /**
-         * add the insert operator to insert vertexes
-         */
-        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, insertOp);
-
-        /**
-         * add the delete operator to delete vertexes
-         */
-        int[] fieldPermutationDelete = new int[] { 0 };
-        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, deleteOp);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink3);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink4);
-
-        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
-        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-
-        /** connect all operators **/
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, groupStartOperator, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
-                terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
-                finalAggregator, 0);
-
-        /**
-         * connect the insert/delete operator
-         */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
-
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), groupEndOperator, 0, materialize, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
-
-        spec.addRoot(emptySink);
-        spec.addRoot(btreeBulkLoad);
-        spec.addRoot(terminateWriter);
-        spec.addRoot(finalAggregator);
-        spec.addRoot(emptySink3);
-        spec.addRoot(emptySink4);
-
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
-
-    @Override
-    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
-        String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        /**
-         * source aggregate
-         */
-        int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), messageValueClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
-
-        /**
-         * construct empty tuple operator
-         */
-        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptyTupleSource);
-
-        /**
-         * construct pre-superstep
-         */
-        IConfigurationFactory confFactory = getConfigurationFactory();
-        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        setLocationConstraint(spec, preSuperStep);
-
-        /**
-         * construct the materializing write operator
-         */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true, jobId, iteration);
-        setLocationConstraint(spec, materializeRead);
-
-        /**
-         * construct the index-set-union operator
-         */
-        String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
-        IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
-
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
-                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
-                comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
-        setLocationConstraint(spec, setUnion);
-
-        /**
-         * construct index-join-function-update operator
-         */
-        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                partialAggregateValueClassNames);
-        IConfigurationFactory configurationFactory = getConfigurationFactory();
-        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(),
-                vertexClass.getName());
-
-        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
-                getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
-                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
-        setLocationConstraint(spec, join);
-
-        /**
-         * construct bulk-load index operator
-         */
-        int fieldPermutation[] = new int[] { 0, 1 };
-        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
-        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
-                WritableComparator.get(vertexIdClass).getClass());
-        String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
-        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
-                indexCmpFactories, fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR,
-                getIndexDataflowHelperFactory());
-        setLocationConstraint(spec, btreeBulkLoad);
-
-        /**
-         * construct group-by operator pipeline
-         */
-        Pair<IOperatorDescriptor, IOperatorDescriptor> groupOps = generateGroupingOperators(spec, iteration,
-                vertexIdClass);
-        IOperatorDescriptor groupStartOperator = groupOps.getLeft();
-        IOperatorDescriptor groupEndOperator = groupOps.getRight();
-
-        /**
-         * construct the materializing write operator
-         */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
-                jobId, iteration);
-        setLocationConstraint(spec, materialize);
-
-        /** construct runtime hook */
-        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PostSuperStepRuntimeHookFactory(jobId));
-        setLocationConstraint(spec, postSuperStep);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink);
-
-        /**
-         * termination state write operator
-         */
-        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
-                configurationFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-
-        /**
-         * final aggregate write operator
-         */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), partialAggregateValueClassNames);
-        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
-                configurationFactory, aggRdFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
-
-        /**
-         * add the insert operator to insert vertexes
-         */
-        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, insertOp);
-
-        /**
-         * add the delete operator to delete vertexes
-         */
-        int[] fieldPermutationDelete = new int[] { 0 };
-        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, deleteOp);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink3);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink4);
-
-        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
-        /** connect all operators **/
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, groupStartOperator, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
-                terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
-                finalAggregator, 0);
-
-        /**
-         * connect the insert/delete operator
-         */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
-
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), groupEndOperator, 0, materialize, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
-
-        spec.addRoot(emptySink);
-        spec.addRoot(btreeBulkLoad);
-        spec.addRoot(terminateWriter);
-        spec.addRoot(emptySink3);
-        spec.addRoot(emptySink4);
-
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
-
-    /** generate plan specific state checkpointing */
-    protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
-        JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
-
-        /** generate secondary index checkpoint */
-        PregelixJob tmpJob = this.createCloneJob("Secondary index checkpointing for job " + jobId, pregelixJob);
-
-        JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
-
-        JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
-        for (int i = 0; i < msgCkpSpecs.length; i++) {
-            specs[i] = msgCkpSpecs[i];
-        }
-        specs[specs.length - 1] = secondaryBTreeCkp;
-        return specs;
-    }
-
-    /**
-     * generate plan specific checkpoint loading
-     */
-    @Override
-    protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
-            throws HyracksException {
-        /** generate message checkpoint load */
-        JobSpecification[] msgCkpSpecs = super.generateStateCheckpointLoading(lastSuccessfulIteration, job);
-
-        /** generate secondary index checkpoint load */
-        PregelixJob tmpJob = this.createCloneJob("Secondary index checkpoint loading for job " + jobId, pregelixJob);
-        tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
-        JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
-        JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
-        for (int i = 0; i < msgCkpSpecs.length; i++) {
-            specs[i] = msgCkpSpecs[i];
-        }
-        specs[specs.length - 1] = secondaryBTreeCkpLoad;
-        return specs;
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
-            throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
-        PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
-        tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
-        try {
-            FileInputFormat.setInputPaths(tmpJob, checkpointPath);
-        } catch (IOException e) {
-            throw new HyracksException(e);
-        }
-
-        /***
-         * HDFS read operator
-         */
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-        try {
-            InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
-                    job.getConfiguration());
-            splits = inputFormat.getSplits(tmpJob);
-            LOGGER.info("number of splits: " + splits.size());
-            for (InputSplit split : splits)
-                LOGGER.info(split.toString());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), MsgList.class.getName());
-        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
-        HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
-                readSchedule, new KeyValueParserFactory());
-        setLocationConstraint(spec, scanner);
-
-        /** construct the sort operator to sort message states */
-        int[] keyFields = new int[] { 0 };
-        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
-        IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
-        sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
-        ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
-        setLocationConstraint(spec, sort);
-
-        /**
-         * construct bulk-load index operator
-         */
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        int fieldPermutation[] = new int[] { 0, 1 };
-        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
-        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
-                .get(vertexIdClass).getClass());
-        String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
-        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
-                indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
-                getIndexDataflowHelperFactory());
-        setLocationConstraint(spec, btreeBulkLoad);
-
-        /**
-         * connect operator descriptors
-         */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, btreeBulkLoad, 0);
-        spec.setFrameSize(frameSize);
-
-        return spec;
-    }
-
-    @SuppressWarnings({ "rawtypes" })
-    private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
-            throws HyracksException {
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
-        FileOutputFormat.setOutputPath(job, new Path(checkpointPath));
-        job.setOutputKeyClass(BspUtils.getVertexIndexClass(job.getConfiguration()));
-        job.setOutputValueClass(MsgList.class);
-
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
-        Class<? extends Writable> msgListClass = MsgList.class;
-        String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
-        JobSpecification spec = new JobSpecification(frameSize);
-        /**
-         * construct empty tuple operator
-         */
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        setLocationConstraint(spec, emptyTupleSource);
-
-        /**
-         * construct btree search operator
-         */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), msgListClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
-
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-
-        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
-                comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null);
-        setLocationConstraint(spec, scanner);
-
-        /**
-         * construct write file operator
-         */
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), vertexIdClass.getName(), MsgList.class.getName());
-        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
-        setLocationConstraint(spec, writer);
-
-        /**
-         * connect operator descriptors
-         */
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
-
-    @Override
-    public JobSpecification[] generateCleanup() throws HyracksException {
-        JobSpecification[] cleanups = new JobSpecification[3];
-        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
-        cleanups[1] = this.dropIndex(SECONDARY_INDEX_ODD);
-        cleanups[2] = this.dropIndex(SECONDARY_INDEX_EVEN);
-        return cleanups;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
deleted file mode 100644
index b4a12b8..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.core.jobgen;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.io.VLongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.core.data.TypeTraits;
-import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
-import edu.uci.ics.pregelix.core.util.DataflowUtils;
-import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
-import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
-
-public class JobGenOuterJoin extends JobGen {
-
-    public JobGenOuterJoin(PregelixJob job, IOptimizer optimizer) {
-        super(job, optimizer);
-    }
-
-    public JobGenOuterJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
-        super(job, jobId, optimizer);
-    }
-
-    @Override
-    protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
-        String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-
-        IConfigurationFactory confFactory = getConfigurationFactory();
-        JobSpecification spec = new JobSpecification(frameSize);
-	
-        /**
-         * construct empty tuple operator
-         */
-        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptyTupleSource);
-
-        /** construct runtime hook */
-        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        setLocationConstraint(spec, preSuperStep);
-
-        /**
-         * construct btree search function update operator
-         */
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), vertexClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
-
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                partialAggregateValueClassNames);
-        IConfigurationFactory configurationFactory = getConfigurationFactory();
-        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), vertexIdClass.getName(), vertexClass.getName());
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), messageValueClass.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                MsgList.class.getName());
-
-        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
-                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        setLocationConstraint(spec, scanner);
-
-        /**
-         * construct group-by operator pipeline
-         */
-        Pair<IOperatorDescriptor, IOperatorDescriptor> groupOps = generateGroupingOperators(spec, iteration,
-                vertexIdClass);
-        IOperatorDescriptor groupStartOperator = groupOps.getLeft();
-        IOperatorDescriptor groupEndOperator = groupOps.getRight();
-
-        /**
-         * construct the materializing write operator
-         */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
-                jobId, iteration);
-        setLocationConstraint(spec, materialize);
-
-        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PostSuperStepRuntimeHookFactory(jobId));
-        setLocationConstraint(spec, postSuperStep);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink2);
-
-        /**
-         * termination state write operator
-         */
-        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
-                configurationFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-
-        /**
-         * final aggregate write operator
-         */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), partialAggregateValueClassNames);
-        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
-                configurationFactory, aggRdFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
-
-        /**
-         * add the insert operator to insert vertexes
-         */
-        int[] fieldPermutation = new int[] { 0, 1 };
-        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, insertOp);
-
-        /**
-         * add the delete operator to delete vertexes
-         */
-        int[] fieldPermutationDelete = new int[] { 0 };
-        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, deleteOp);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink4);
-
-        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
-        /** connect all operators **/
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, groupStartOperator, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
-                terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
-                finalAggregator, 0);
-
-        /**
-         * connect the insert/delete operator
-         */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
-
-        /**
-         * connect the group-by operator
-         */
-        spec.connect(new OneToOneConnectorDescriptor(spec), groupEndOperator, 0, materialize, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
-
-        spec.addRoot(terminateWriter);
-        spec.addRoot(finalAggregator);
-        spec.addRoot(emptySink2);
-        spec.addRoot(emptySink3);
-        spec.addRoot(emptySink4);
-
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
-
-    @Override
-    protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
-        String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        /**
-         * source aggregate
-         */
-        int[] keyFields = new int[] { 0 };
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
-                vertexIdClass.getName(), messageValueClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                MsgList.class.getName());
-        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
-                vertexClass.getName());
-        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
-
-        /**
-         * construct empty tuple operator
-         */
-        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptyTupleSource);
-
-        /**
-         * construct pre-superstep hook
-         */
-        IConfigurationFactory confFactory = getConfigurationFactory();
-        RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        setLocationConstraint(spec, preSuperStep);
-
-        /**
-         * construct the materializing write operator
-         */
-        MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true, jobId, iteration);
-        setLocationConstraint(spec, materializeRead);
-
-        /**
-         * construct index join function update operator
-         */
-        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
-        nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
-        nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
-
-        RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                VLongWritable.class.getName());
-        RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
-                partialAggregateValueClassNames);
-        IConfigurationFactory configurationFactory = getConfigurationFactory();
-        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(),
-                vertexClass.getName());
-
-        IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
-                getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdInsert, rdDelete);
-        setLocationConstraint(spec, join);
-
-        /**
-         * construct group-by operator pipeline
-         */
-        Pair<IOperatorDescriptor, IOperatorDescriptor> groupOps = generateGroupingOperators(spec, iteration,
-                vertexIdClass);
-        IOperatorDescriptor groupStartOperator = groupOps.getLeft();
-        IOperatorDescriptor groupEndOperator = groupOps.getRight();
-
-        /**
-         * construct the materializing write operator
-         */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
-                jobId, iteration);
-        setLocationConstraint(spec, materialize);
-
-        /** construct runtime hook */
-        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
-                new PostSuperStepRuntimeHookFactory(jobId));
-        setLocationConstraint(spec, postSuperStep);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink);
-
-        /**
-         * termination state write operator
-         */
-        TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
-                configurationFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-
-        /**
-         * final aggregate write operator
-         */
-        IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                getConfigurationFactory(), partialAggregateValueClassNames);
-        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
-                configurationFactory, aggRdFactory, jobId);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
-
-        /**
-         * add the insert operator to insert vertexes
-         */
-        int[] fieldPermutation = new int[] { 0, 1 };
-        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
-                null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, insertOp);
-
-        /**
-         * add the delete operator to delete vertexes
-         */
-        int[] fieldPermutationDelete = new int[] { 0 };
-        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
-                getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        setLocationConstraint(spec, deleteOp);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink3);
-
-        /** construct empty sink operator */
-        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        setLocationConstraint(spec, emptySink4);
-
-        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
-        ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
-
-        /** connect all operators **/
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, groupStartOperator, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
-                terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
-                finalAggregator, 0);
-
-        /**
-         * connect the insert/delete operator
-         */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), groupEndOperator, 0, materialize, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
-
-        spec.addRoot(terminateWriter);
-        spec.addRoot(finalAggregator);
-        spec.addRoot(emptySink);
-
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
-
-    @Override
-    public JobSpecification[] generateCleanup() throws HyracksException {
-        JobSpecification[] cleanups = new JobSpecification[1];
-        cleanups[0] = this.dropIndex(PRIMARY_INDEX);
-        return cleanups;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
deleted file mode 100644
index 9f2a66d..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.jobgen;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparator;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
-import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
-import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNormalizedKeyComputerFactory;
-
-@SuppressWarnings({ "rawtypes" })
-public class JobGenUtil {
-
-    /**
-     * get normalized key factory for an iteration, to sort messages iteration
-     * 1: asc order iteration 2: desc order
-     * 
-     * @param iteration
-     * @param keyClass
-     * @return
-     */
-    public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(Configuration conf) {
-        return RawNormalizedKeyComputerFactory.INSTANCE;
-    }
-
-    /**
-     * get comparator for an iteration, to sort messages iteration 1: asc order
-     * iteration 0: desc order
-     * 
-     * @param iteration
-     * @param keyClass
-     * @return
-     */
-    public static IBinaryComparatorFactory getIBinaryComparatorFactory(int iteration, Class keyClass) {
-        return RawBinaryComparatorFactory.INSTANCE;
-    }
-
-    /**
-     * get normalized key factory for the final output job
-     * 
-     * @param iteration
-     * @param keyClass
-     * @return
-     */
-    public static INormalizedKeyComputerFactory getFinalNormalizedKeyComputerFactory(Configuration conf) {
-        Class<? extends NormalizedKeyComputer> clazz = BspUtils.getNormalizedKeyComputerClass(conf);
-        if (clazz.equals(NormalizedKeyComputer.class)) {
-            return null;
-        }
-        return new VertexIdNormalizedKeyComputerFactory(clazz);
-    }
-
-    /**
-     * get comparator for the final output job
-     * 
-     * @param iteration
-     * @param keyClass
-     * @return
-     */
-    @SuppressWarnings("unchecked")
-    public static IBinaryComparatorFactory getFinalBinaryComparatorFactory(Class vertexIdClass) {
-        return new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass).getClass());
-    }
-
-    /**
-     * get the B-tree scan order for an iteration iteration 1: desc order,
-     * backward scan iteration 2: asc order, forward scan
-     * 
-     * @param iteration
-     * @return
-     */
-    public static boolean getForwardScan(int iteration) {
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
deleted file mode 100644
index d1f8b65..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.core.jobgen.clusterconfig;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
-
-public class ClusterConfig {
-
-    private static String[] NCs;
-    private static String storePropertiesPath = "conf/stores.properties";
-    private static String clusterPropertiesPath = "conf/cluster.properties";
-    private static Properties clusterProperties = new Properties();
-    private static Map<String, List<String>> ipToNcMapping;
-    private static String[] stores;
-    private static Scheduler hdfsScheduler;
-    private static Set<String> blackListNodes = new HashSet<String>();
-    private static IHyracksClientConnection hcc;
-    private static final int DEFAULT_CC_HTTP_PORT = 16001;
-
-    /**
-     * let tests set config path to be whatever
-     * 
-     * @param propertiesPath
-     *            stores properties file path
-     */
-    public static void setStorePath(String storePropertiesPath) throws HyracksException {
-        ClusterConfig.storePropertiesPath = storePropertiesPath;
-    }
-
-    public static void setClusterPropertiesPath(String clusterPropertiesPath) throws HyracksException {
-        ClusterConfig.clusterPropertiesPath = clusterPropertiesPath;
-    }
-
-    /**
-     * get NC names running on one IP address
-     * 
-     * @param ipAddress
-     * @return
-     * @throws HyracksDataException
-     */
-    public static List<String> getNCNames(String ipAddress) throws HyracksException {
-        return ipToNcMapping.get(ipAddress);
-    }
-
-    /**
-     * get file split provider, for test only
-     * 
-     * @param jobId
-     * @return
-     * @throws HyracksDataException
-     */
-    public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {
-        FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];
-        int i = 0;
-        for (String nc : NCs) {
-            for (String st : stores) {
-                FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId
-                        + File.separator + indexName);
-                fileSplits[i++] = split;
-            }
-        }
-        return new ConstantFileSplitProvider(fileSplits);
-    }
-
-    private static void loadStores() throws HyracksException {
-        Properties properties = new Properties();
-        try {
-            properties.load(new FileInputStream(storePropertiesPath));
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-        String store = properties.getProperty("store");
-        stores = store.split(",");
-    }
-
-    private static void loadClusterProperties() throws HyracksException {
-        try {
-            clusterProperties.load(new FileInputStream(clusterPropertiesPath));
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public static int getFrameSize() {
-        return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));
-    }
-
-    public static int getCCHTTPort() {
-        try { // TODO should we really provide a default value?
-            return Integer.parseInt(clusterProperties.getProperty("CC_HTTPPORT"));
-        } catch (NumberFormatException e) {
-            return DEFAULT_CC_HTTP_PORT;
-        }
-    }
-
-    /**
-     * set location constraint
-     * 
-     * @param spec
-     * @param operator
-     * @throws HyracksDataException
-     */
-    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
-            List<InputSplit> splits) throws HyracksException {
-        int count = splits.size();
-        String[] locations = new String[splits.size()];
-        Random random = new Random(System.currentTimeMillis());
-        for (int i = 0; i < splits.size(); i++) {
-            try {
-                String[] loc = splits.get(i).getLocations();
-                Collections.shuffle(Arrays.asList(loc), random);
-                if (loc.length > 0) {
-                    InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
-                    for (InetAddress ip : allIps) {
-                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                            List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
-                            int pos = random.nextInt(ncs.size());
-                            locations[i] = ncs.get(pos);
-                        } else {
-                            int pos = random.nextInt(NCs.length);
-                            locations[i] = NCs[pos];
-                        }
-                    }
-                } else {
-                    int pos = random.nextInt(NCs.length);
-                    locations[i] = NCs[pos];
-                }
-            } catch (IOException e) {
-                throw new HyracksException(e);
-            } catch (InterruptedException e) {
-                throw new HyracksException(e);
-            }
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
-    }
-
-    /**
-     * set location constraint
-     * 
-     * @param spec
-     * @param operator
-     * @throws HyracksDataException
-     */
-    public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
-        int count = NCs.length * stores.length;
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
-    }
-
-    public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
-        try {
-            if (hcc == null) {
-                hcc = new HyracksConnection(ipAddress, port);
-            }
-            Map<String, NodeControllerInfo> ncNameToNcInfos = new TreeMap<String, NodeControllerInfo>();
-            ncNameToNcInfos.putAll(hcc.getNodeControllerInfos());
-
-            /**
-             * remove black list nodes -- which had disk failures
-             */
-            for (String blackListNode : blackListNodes) {
-                ncNameToNcInfos.remove(blackListNode);
-            }
-
-            NCs = new String[ncNameToNcInfos.size()];
-            ipToNcMapping = new HashMap<String, List<String>>();
-            int i = 0;
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
-                if (matchedNCs == null) {
-                    matchedNCs = new ArrayList<String>();
-                    ipToNcMapping.put(ipAddr, matchedNCs);
-                }
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-
-            hdfsScheduler = new Scheduler(hcc.getNodeControllerInfos(), hcc.getClusterTopology());
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-
-        loadClusterProperties();
-        loadStores();
-    }
-
-    public static Scheduler getHdfsScheduler() {
-        return hdfsScheduler;
-    }
-
-    public static String[] getLocationConstraint() throws HyracksException {
-        int count = 0;
-        String[] locations = new String[NCs.length * stores.length];
-        for (String nc : NCs) {
-            for (int i = 0; i < stores.length; i++) {
-                locations[count] = nc;
-                count++;
-            }
-        }
-        return locations;
-    }
-    
-    /**
-     * set the default location constraint
-     * 
-     * @param spec
-     * @param operator
-     * @throws HyracksDataException
-     */
-    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
-            throws HyracksException {
-        int count = 0;
-        String[] locations = new String[NCs.length * stores.length];
-        for (String nc : NCs) {
-            for (int i = 0; i < stores.length; i++) {
-                locations[count] = nc;
-                count++;
-            }
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
-    }
-
-    public static String[] getNCNames() {
-        return NCs;
-    }
-
-    public static String[] getStores() {
-        return stores;
-    }
-
-    public static void addToBlackListNodes(Collection<String> nodes) {
-        blackListNodes.addAll(nodes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
deleted file mode 100644
index 2dbaf88..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.optimizer;
-
-import java.io.File;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.IntWritable;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.client.stats.Counters;
-import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-
-public class DynamicOptimizer implements IOptimizer {
-
-    private IClusterCounterContext counterContext;
-    private Map<String, IntWritable> machineToDegreeOfParallelism = new TreeMap<String, IntWritable>();
-    private int dop = 0;
-
-    public DynamicOptimizer(IClusterCounterContext counterContext) {
-        this.counterContext = counterContext;
-    }
-
-    @Override
-    public JobGen optimize(JobGen jobGen, int iteration) {
-        try {
-            if (iteration == 0) {
-                initializeLoadPerMachine();
-            }
-            return jobGen;
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
-        try {
-            String[] constraints = new String[dop];
-            int index = 0;
-            for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
-                String loc = entry.getKey();
-                IntWritable count = entry.getValue();
-                for (int j = 0; j < count.get(); j++) {
-                    constraints[index++] = loc;
-                }
-            }
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, constraints);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
-        FileSplit[] fileSplits = new FileSplit[dop];
-        String[] stores = ClusterConfig.getStores();
-        int splitIndex = 0;
-        for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
-            String ncName = entry.getKey();
-            IntWritable count = entry.getValue();
-            for (int j = 0; j < count.get(); j++) {
-                //cycles stores, each machine has the number of stores = the number of cores
-                int storeCursor = j % stores.length;
-                String st = stores[storeCursor];
-                FileSplit split = new FileSplit(ncName, st + File.separator + ncName + "-data" + File.separator + jobId
-                        + File.separator + indexName + (j / stores.length));
-                fileSplits[splitIndex++] = split;
-            }
-        }
-        return new ConstantFileSplitProvider(fileSplits);
-    }
-
-    /**
-     * initialize the load-per-machine map
-     * 
-     * @return the degree of parallelism
-     * @throws HyracksException
-     */
-    private int initializeLoadPerMachine() throws HyracksException {
-        machineToDegreeOfParallelism.clear();
-        String[] locationConstraints = ClusterConfig.getLocationConstraint();
-        for (String loc : locationConstraints) {
-            machineToDegreeOfParallelism.put(loc, new IntWritable(0));
-        }
-        dop = 0;
-        for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
-            String loc = entry.getKey();
-            //reserve one core for heartbeat
-            int load = (int) counterContext.getCounter(Counters.NUM_PROCESSOR, false).get();
-            //load = load > 3 ? load - 2 : load;
-            IntWritable count = machineToDegreeOfParallelism.get(loc);
-            count.set(load);
-            dop += load;
-        }
-        return dop;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
deleted file mode 100644
index c8856aa..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.optimizer;
-
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.pregelix.core.jobgen.JobGen;
-
-public interface IOptimizer {
-
-    public JobGen optimize(JobGen jobGen, int iteration);
-
-    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator);
-
-    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
deleted file mode 100644
index cd0ca37..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.optimizer;
-
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-
-public class NoOpOptimizer implements IOptimizer {
-
-    @Override
-    public JobGen optimize(JobGen jobGen, int iteration) {
-        return jobGen;
-    }
-
-    @Override
-    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
-        try {
-            ClusterConfig.setLocationConstraint(spec, operator);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
-        try {
-           return ClusterConfig.getFileSplitProvider(jobId, indexName);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/ffc967fd/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java
deleted file mode 100644
index 448a80f..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.core.runtime.touchpoint;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-
-public class RawBinaryComparatorFactory implements IBinaryComparatorFactory {
-
-    private static final long serialVersionUID = 1L;
-    public static IBinaryComparatorFactory INSTANCE = new RawBinaryComparatorFactory();
-
-    private RawBinaryComparatorFactory() {
-    }
-
-    @Override
-    public IBinaryComparator createBinaryComparator() {
-        return new IBinaryComparator() {
-
-            @Override
-            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-                if (b1 == b2 && s1 == s2) {
-                    return 0;
-                }
-                int commonLength = Math.min(l1, l2);
-                for (int i = 0; i < commonLength; i++) {
-                    if (b1[s1 + i] != b2[s2 + i]) {
-                        return (b1[s1 + i] & 0xff) - (b2[s2 + i] & 0xff);
-                    }
-                }
-                int difference = l1 - l2;
-                return difference == 0 ? 0 : (difference > 0 ? 1 : -1);
-            }
-
-        };
-    }
-}


Mime
View raw message