asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [15/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HiveObjectParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HiveObjectParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HiveObjectParser.java
deleted file mode 100644
index d7fa4f2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HiveObjectParser.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AUnorderedListType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.util.string.UTF8StringWriter;
-
-@SuppressWarnings("deprecation")
-public class HiveObjectParser implements IAsterixHDFSRecordParser {
-
-    private static final String KEY_HIVE_SERDE = "hive-serde";
-    private ARecordType aRecord;
-    private SerDe hiveSerde;
-    private StructObjectInspector oi;
-    private IARecordBuilder recBuilder;
-    private ArrayBackedValueStorage fieldValueBuffer;
-    private ArrayBackedValueStorage listItemBuffer;
-    private byte[] fieldTypeTags;
-    private IAType[] fieldTypes;
-    private OrderedListBuilder orderedListBuilder;
-    private UnorderedListBuilder unorderedListBuilder;
-    private boolean initialized = false;
-    private List<StructField> fieldRefs;
-    private UTF8StringWriter utf8Writer = new UTF8StringWriter();
-
-    @SuppressWarnings({ "unchecked" })
-    @Override
-    public void initialize(ARecordType record, Map<String, String> arguments, Configuration hadoopConfig)
-            throws Exception {
-        if (!initialized) {
-            this.aRecord = record;
-            int n = record.getFieldNames().length;
-            fieldTypes = record.getFieldTypes();
-
-            //create the hive table schema.
-            Properties tbl = new Properties();
-            tbl.put(Constants.LIST_COLUMNS, getCommaDelimitedColNames(record));
-            tbl.put(Constants.LIST_COLUMN_TYPES, getColTypes(record));
-            String hiveSerdeClassName = (String) arguments.get(KEY_HIVE_SERDE);
-            if (hiveSerdeClassName == null) {
-                throw new IllegalArgumentException("no hive serde provided for hive deserialized records");
-            }
-            hiveSerde = (SerDe) Class.forName(hiveSerdeClassName).newInstance();
-            hiveSerde.initialize(hadoopConfig, tbl);
-            oi = (StructObjectInspector) hiveSerde.getObjectInspector();
-
-            fieldValueBuffer = new ArrayBackedValueStorage();
-            recBuilder = new RecordBuilder();
-            recBuilder.reset(record);
-            recBuilder.init();
-            fieldTypeTags = new byte[n];
-            for (int i = 0; i < n; i++) {
-                ATypeTag tag = record.getFieldTypes()[i].getTypeTag();
-                fieldTypeTags[i] = tag.serialize();
-            }
-            fieldRefs = (List<StructField>) oi.getAllStructFieldRefs();
-            initialized = true;
-        }
-    }
-
-    private Object getColTypes(ARecordType record) throws Exception {
-        int n = record.getFieldTypes().length;
-        if (n < 1) {
-            throw new HyracksDataException("Failed to get columns of record");
-        }
-        ATypeTag tag = null;
-
-        //First Column
-        if (record.getFieldTypes()[0].getTypeTag() == ATypeTag.UNION) {
-            if (NonTaggedFormatUtil.isOptional(record.getFieldTypes()[0])) {
-                throw new NotImplementedException("Non-optional UNION type is not supported.");
-            }
-            tag = ((AUnionType) record.getFieldTypes()[0]).getNullableType().getTypeTag();
-        } else {
-            tag = record.getFieldTypes()[0].getTypeTag();
-        }
-        if (tag == null) {
-            throw new NotImplementedException("Failed to get the type information for field " + 0 + ".");
-        }
-        String cols = getHiveTypeString(tag);
-
-        for (int i = 1; i < n; i++) {
-            tag = null;
-            if (record.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
-                if (NonTaggedFormatUtil.isOptional(record.getFieldTypes()[i])) {
-                    throw new NotImplementedException("Non-optional UNION type is not supported.");
-                }
-                tag = ((AUnionType) record.getFieldTypes()[i]).getNullableType().getTypeTag();
-            } else {
-                tag = record.getFieldTypes()[i].getTypeTag();
-            }
-            if (tag == null) {
-                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
-            }
-            cols = cols + "," + getHiveTypeString(tag);
-        }
-        return cols;
-    }
-
-    private String getCommaDelimitedColNames(ARecordType record) throws Exception {
-        if (record.getFieldNames().length < 1) {
-            throw new HyracksDataException("Can't deserialize hive records with no closed columns");
-        }
-
-        String cols = record.getFieldNames()[0];
-        for (int i = 1; i < record.getFieldNames().length; i++) {
-            cols = cols + "," + record.getFieldNames()[i];
-        }
-        return cols;
-    }
-
-    private String getHiveTypeString(ATypeTag tag) throws Exception {
-        switch (tag) {
-            case BOOLEAN:
-                return Constants.BOOLEAN_TYPE_NAME;
-            case DATE:
-                return Constants.DATE_TYPE_NAME;
-            case DATETIME:
-                return Constants.DATETIME_TYPE_NAME;
-            case DOUBLE:
-                return Constants.DOUBLE_TYPE_NAME;
-            case FLOAT:
-                return Constants.FLOAT_TYPE_NAME;
-            case INT16:
-                return Constants.SMALLINT_TYPE_NAME;
-            case INT32:
-                return Constants.INT_TYPE_NAME;
-            case INT64:
-                return Constants.BIGINT_TYPE_NAME;
-            case INT8:
-                return Constants.TINYINT_TYPE_NAME;
-            case ORDEREDLIST:
-                return Constants.LIST_TYPE_NAME;
-            case STRING:
-                return Constants.STRING_TYPE_NAME;
-            case TIME:
-                return Constants.DATETIME_TYPE_NAME;
-            case UNORDEREDLIST:
-                return Constants.LIST_TYPE_NAME;
-            default:
-                throw new HyracksDataException("Can't get hive type for field of type " + tag);
-        }
-    }
-
-    @Override
-    public void parse(Object object, DataOutput output) throws Exception {
-        if (object == null) {
-            throw new HyracksDataException("Hive parser can't parse null objects");
-        }
-        Object hiveObject = hiveSerde.deserialize((Writable) object);
-        int n = aRecord.getFieldNames().length;
-        List<Object> attributesValues = oi.getStructFieldsDataAsList(hiveObject);
-        recBuilder.reset(aRecord);
-        recBuilder.init();
-        for (int i = 0; i < n; i++) {
-            fieldValueBuffer.reset();
-            fieldValueBuffer.getDataOutput().writeByte(fieldTypeTags[i]);
-            ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
-            //get field type
-            switch (fieldTypes[i].getTypeTag()) {
-                case BOOLEAN:
-                    parseBoolean(attributesValues.get(i), (BooleanObjectInspector) foi,
-                            fieldValueBuffer.getDataOutput());
-                    break;
-                case TIME:
-                    parseTime(attributesValues.get(i), (TimestampObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case DATE:
-                    parseDate(attributesValues.get(i), (TimestampObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case DATETIME:
-                    parseDateTime(attributesValues.get(i), (TimestampObjectInspector) foi,
-                            fieldValueBuffer.getDataOutput());
-                    break;
-                case DOUBLE:
-                    parseDouble(attributesValues.get(i), (DoubleObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case FLOAT:
-                    parseFloat(attributesValues.get(i), (FloatObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case INT8:
-                    parseInt8(attributesValues.get(i), (ByteObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case INT16:
-                    parseInt16(attributesValues.get(i), (ShortObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case INT32:
-                    parseInt32(attributesValues.get(i), (IntObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case INT64:
-                    parseInt64(attributesValues.get(i), (LongObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case STRING:
-                    parseString(attributesValues.get(i), (StringObjectInspector) foi, fieldValueBuffer.getDataOutput());
-                    break;
-                case ORDEREDLIST:
-                    parseOrderedList((AOrderedListType) fieldTypes[i], attributesValues.get(i),
-                            (ListObjectInspector) foi);
-                    break;
-                case UNORDEREDLIST:
-                    parseUnorderedList((AUnorderedListType) fieldTypes[i], attributesValues.get(i),
-                            (ListObjectInspector) foi);
-                    break;
-                default:
-                    throw new HyracksDataException("Can't get hive type for field of type "
-                            + fieldTypes[i].getTypeTag());
-            }
-            recBuilder.addField(i, fieldValueBuffer);
-        }
-        recBuilder.write(output, true);
-    }
-
-    private void parseInt64(Object obj, LongObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeLong(foi.get(obj));
-    }
-
-    private void parseInt32(Object obj, IntObjectInspector foi, DataOutput dataOutput) throws IOException {
-        if (obj == null) {
-            throw new HyracksDataException("can't parse null field");
-        }
-        dataOutput.writeInt(foi.get(obj));
-    }
-
-    private void parseInt16(Object obj, ShortObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeShort(foi.get(obj));
-    }
-
-    private void parseFloat(Object obj, FloatObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeFloat(foi.get(obj));
-    }
-
-    private void parseDouble(Object obj, DoubleObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeDouble(foi.get(obj));
-    }
-
-    private void parseDateTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeLong(foi.getPrimitiveJavaObject(obj).getTime());
-    }
-
-    private void parseDate(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
-        long chrononTimeInMs = foi.getPrimitiveJavaObject(obj).getTime();
-        short temp = 0;
-        if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
-            temp = 1;
-        }
-        dataOutput.writeInt((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
-    }
-
-    private void parseBoolean(Object obj, BooleanObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeBoolean(foi.get(obj));
-    }
-
-    private void parseInt8(Object obj, ByteObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeByte(foi.get(obj));
-    }
-
-    private void parseString(Object obj, StringObjectInspector foi, DataOutput dataOutput) throws IOException {
-        utf8Writer.writeUTF8(foi.getPrimitiveJavaObject(obj), dataOutput);
-    }
-
-    private void parseTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
-        dataOutput.writeInt((int) (foi.getPrimitiveJavaObject(obj).getTime() % 86400000));
-    }
-
-    private void parseOrderedList(AOrderedListType aOrderedListType, Object obj, ListObjectInspector foi)
-            throws IOException {
-        OrderedListBuilder orderedListBuilder = getOrderedListBuilder();
-        IAType itemType = null;
-        if (aOrderedListType != null)
-            itemType = aOrderedListType.getItemType();
-        orderedListBuilder.reset(aOrderedListType);
-
-        int n = foi.getListLength(obj);
-        for (int i = 0; i < n; i++) {
-            Object element = foi.getListElement(obj, i);
-            ObjectInspector eoi = foi.getListElementObjectInspector();
-            if (element == null) {
-                throw new HyracksDataException("can't parse hive list with null values");
-            }
-
-            parseHiveListItem(element, eoi, listItemBuffer, itemType);
-            orderedListBuilder.addItem(listItemBuffer);
-        }
-        orderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
-    }
-
-    private void parseUnorderedList(AUnorderedListType uoltype, Object obj, ListObjectInspector oi) throws IOException,
-            AsterixException {
-        UnorderedListBuilder unorderedListBuilder = getUnorderedListBuilder();
-        IAType itemType = null;
-        if (uoltype != null)
-            itemType = uoltype.getItemType();
-        byte tagByte = itemType.getTypeTag().serialize();
-        unorderedListBuilder.reset(uoltype);
-
-        int n = oi.getListLength(obj);
-        for (int i = 0; i < n; i++) {
-            Object element = oi.getListElement(obj, i);
-            ObjectInspector eoi = oi.getListElementObjectInspector();
-            if (element == null) {
-                throw new HyracksDataException("can't parse hive list with null values");
-            }
-            listItemBuffer.reset();
-            listItemBuffer.getDataOutput().writeByte(tagByte);
-            parseHiveListItem(element, eoi, listItemBuffer, itemType);
-            unorderedListBuilder.addItem(listItemBuffer);
-        }
-        unorderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
-    }
-
-    private void parseHiveListItem(Object obj, ObjectInspector eoi, ArrayBackedValueStorage fieldValueBuffer,
-            IAType itemType) throws IOException {
-        //get field type
-        switch (itemType.getTypeTag()) {
-            case BOOLEAN:
-                parseBoolean(obj, (BooleanObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case TIME:
-                parseTime(obj, (TimestampObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case DATE:
-                parseDate(obj, (TimestampObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case DATETIME:
-                parseDateTime(obj, (TimestampObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case DOUBLE:
-                parseDouble(obj, (DoubleObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case FLOAT:
-                parseFloat(obj, (FloatObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case INT8:
-                parseInt8(obj, (ByteObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case INT16:
-                parseInt16(obj, (ShortObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case INT32:
-                parseInt32(obj, (IntObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case INT64:
-                parseInt64(obj, (LongObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            case STRING:
-                parseString(obj, (StringObjectInspector) eoi, fieldValueBuffer.getDataOutput());
-                break;
-            default:
-                throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
-        }
-    }
-
-    private OrderedListBuilder getOrderedListBuilder() {
-        if (orderedListBuilder != null)
-            return orderedListBuilder;
-        else {
-            orderedListBuilder = new OrderedListBuilder();
-            return orderedListBuilder;
-        }
-    }
-
-    private UnorderedListBuilder getUnorderedListBuilder() {
-        if (unorderedListBuilder != null)
-            return unorderedListBuilder;
-        else {
-            unorderedListBuilder = new UnorderedListBuilder();
-            return unorderedListBuilder;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IAsterixHDFSRecordParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IAsterixHDFSRecordParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IAsterixHDFSRecordParser.java
deleted file mode 100644
index ff5bc27..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IAsterixHDFSRecordParser.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataOutput;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.asterix.om.types.ARecordType;
-
-/**
- * This interface is provided for users to implements in order to support their own
- * it should be included sometimes in the future in the external library
- * input parsing
- * @author alamouda
- *
- */
-public interface IAsterixHDFSRecordParser {
-
-    /**
-     * This method is called once upon creating the serde before starting to parse objects
-     * @param record
-     *  The description of the expected dataset record.
-     * @param arguments
-     *  The arguments passed when creating the external dataset
-     */
-    public void initialize(ARecordType record, Map<String, String> arguments, Configuration hadoopConfig) throws Exception;
-    
-    /**
-     * This function takes an object, parse it and then serialize it into an adm record in the output buffer
-     * @param object
-     *  the serialized I/O object
-     * @param output
-     *  output buffer where deserialized object need to be serialized
-     */
-    public void parse(Object object, DataOutput output) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParser.java
deleted file mode 100644
index c8fc6c2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParser.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * This interface is to be implemented by parsers used in a pipelined hyracks job where input is not ready all at once
- */
-public interface IControlledTupleParser {
-    /**
-     * This function should flush the tuples setting in the frame writer buffer
-     * and free all resources
-     */
-    public void close(IFrameWriter writer) throws Exception;
-
-    /**
-     * This function is called when there are more data ready for parsing in the input stream
-     * @param writer
-     *          a frame writer that is used to push outgoig frames 
-     * @param frameBuffer 
-     *          a frame buffer containing the incoming tuples, used for propagating fields.
-     */
-    public void parseNext(IFrameWriter writer, ByteBuffer frameBuffer) throws HyracksDataException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParserFactory.java
deleted file mode 100644
index 52d5123..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IControlledTupleParserFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-public interface IControlledTupleParserFactory {
-    public IControlledTupleParser createTupleParser();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java
deleted file mode 100644
index 2a51380..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/IndexingScheduler.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.logging.Logger;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
-
-public class IndexingScheduler {
-    private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
-
-    /** a list of NCs */
-    private String[] NCs;
-
-    /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
-    /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
-
-    /** a map from NC name to the NodeControllerInfo */
-    private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    public IndexingScheduler(String ipAddress, int port) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    /**
-     * Set location constraints for a file scan operator with a list of file
-     * splits. It tries to assign splits to their local machines fairly
-     * Locality is more important than fairness
-     *
-     * @throws HyracksDataException
-     */
-    public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
-        if (splits == null) {
-            /** deal the case when the splits array is null */
-            return new String[] {};
-        }
-        int[] workloads = new int[NCs.length];
-        Arrays.fill(workloads, 0);
-        String[] locations = new String[splits.length];
-        Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
-        /**
-         * upper bound is number of splits
-         */
-        int upperBoundSlots = splits.length;
-
-        try {
-            Random random = new Random(System.currentTimeMillis());
-            boolean scheduled[] = new boolean[splits.length];
-            Arrays.fill(scheduled, false);
-            /**
-             * scan the splits and build the popularity map
-             * give the machines with less local splits more scheduling priority
-             */
-            buildPopularityMap(splits, locationToNumOfSplits);
-            HashMap<String, Integer> locationToNumOfAssignement = new HashMap<String, Integer>();
-            for (String location : locationToNumOfSplits.keySet()) {
-                locationToNumOfAssignement.put(location, 0);
-            }
-            /**
-             * push data-local upper-bounds slots to each machine
-             */
-            scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits,
-                    locationToNumOfAssignement);
-
-            int dataLocalCount = 0;
-            for (int i = 0; i < scheduled.length; i++) {
-                if (scheduled[i] == true) {
-                    dataLocalCount++;
-                }
-            }
-            LOGGER.info("Data local rate: "
-                    + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
-            /**
-             * push non-data-local upper-bounds slots to each machine
-             */
-            locationToNumOfAssignement.clear();
-            for (String nc : NCs) {
-                locationToNumOfAssignement.put(nc, 0);
-            }
-            for (int i = 0; i < scheduled.length; i++) {
-                if (scheduled[i]) {
-                    locationToNumOfAssignement.put(locations[i], locationToNumOfAssignement.get(locations[i]) + 1);
-                }
-            }
-
-            scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled, locationToNumOfAssignement);
-            return locations;
-        } catch (IOException e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    /**
-     * Schedule non-local slots to each machine
-     *
-     * @param splits
-     *            The HDFS file splits.
-     * @param workloads
-     *            The current capacity of each machine.
-     * @param locations
-     *            The result schedule.
-     * @param slotLimit
-     *            The maximum slots of each machine.
-     * @param scheduled
-     *            Indicate which slot is scheduled.
-     * @param locationToNumOfAssignement
-     */
-    private void scheduleNonLocalSlots(InputSplit[] splits, final int[] workloads, String[] locations, int slotLimit,
-            boolean[] scheduled, final HashMap<String, Integer> locationToNumOfAssignement)
-                    throws IOException, UnknownHostException {
-
-        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(NCs.length, new Comparator<String>() {
-            @Override
-            public int compare(String s1, String s2) {
-                return locationToNumOfAssignement.get(s1).compareTo(locationToNumOfAssignement.get(s2));
-            }
-
-        });
-
-        for (String nc : NCs) {
-            scheduleCadndiates.add(nc);
-        }
-        /**
-         * schedule no-local file reads
-         */
-        for (int i = 0; i < splits.length; i++) {
-            /** if there is no data-local NC choice, choose a random one */
-            if (!scheduled[i]) {
-                String selectedNcName = scheduleCadndiates.remove();
-                if (selectedNcName != null) {
-                    int ncIndex = ncNameToIndex.get(selectedNcName);
-                    workloads[ncIndex]++;
-                    scheduled[i] = true;
-                    locations[i] = selectedNcName;
-                    locationToNumOfAssignement.put(selectedNcName, workloads[ncIndex]);
-                    scheduleCadndiates.add(selectedNcName);
-                }
-            }
-        }
-    }
-
-    /**
-     * Schedule data-local slots to each machine.
-     *
-     * @param splits
-     *            The HDFS file splits.
-     * @param workloads
-     *            The current capacity of each machine.
-     * @param locations
-     *            The result schedule.
-     * @param slots
-     *            The maximum slots of each machine.
-     * @param random
-     *            The random generator.
-     * @param scheduled
-     *            Indicate which slot is scheduled.
-     * @throws IOException
-     * @throws UnknownHostException
-     */
-    private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
-            boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits,
-            final HashMap<String, Integer> locationToNumOfAssignement) throws IOException, UnknownHostException {
-        /** scheduling candidates will be ordered inversely according to their popularity */
-        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
-            @Override
-            public int compare(String s1, String s2) {
-                int assignmentDifference = locationToNumOfAssignement.get(s1)
-                        .compareTo(locationToNumOfAssignement.get(s2));
-                if (assignmentDifference != 0) {
-                    return assignmentDifference;
-                }
-                return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
-            }
-
-        });
-
-        for (int i = 0; i < splits.length; i++) {
-            if (scheduled[i]) {
-                continue;
-            }
-            /**
-             * get the location of all the splits
-             */
-            String[] locs = splits[i].getLocations();
-            if (locs.length > 0) {
-                scheduleCadndiates.clear();
-                for (int j = 0; j < locs.length; j++) {
-                    scheduleCadndiates.add(locs[j]);
-                }
-
-                for (String candidate : scheduleCadndiates) {
-                    /**
-                     * get all the IP addresses from the name
-                     */
-                    InetAddress[] allIps = InetAddress.getAllByName(candidate);
-                    /**
-                     * iterate overa all ips
-                     */
-                    for (InetAddress ip : allIps) {
-                        /**
-                         * if the node controller exists
-                         */
-                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                            /**
-                             * set the ncs
-                             */
-                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                            int arrayPos = random.nextInt(dataLocations.size());
-                            String nc = dataLocations.get(arrayPos);
-                            int pos = ncNameToIndex.get(nc);
-                            /**
-                             * check if the node is already full
-                             */
-                            if (workloads[pos] < slots) {
-                                locations[i] = nc;
-                                workloads[pos]++;
-                                scheduled[i] = true;
-                                locationToNumOfAssignement.put(candidate,
-                                        locationToNumOfAssignement.get(candidate) + 1);
-                                break;
-                            }
-                        }
-                    }
-                    /**
-                     * break the loop for data-locations if the schedule has
-                     * already been found
-                     */
-                    if (scheduled[i] == true) {
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Scan the splits once and build a popularity map
-     *
-     * @param splits
-     *            the split array
-     * @param locationToNumOfSplits
-     *            the map to be built
-     * @throws IOException
-     */
-    private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
-            throws IOException {
-        for (InputSplit split : splits) {
-            String[] locations = split.getLocations();
-            for (String loc : locations) {
-                IntWritable locCount = locationToNumOfSplits.get(loc);
-                if (locCount == null) {
-                    locCount = new IntWritable(0);
-                    locationToNumOfSplits.put(loc, locCount);
-                }
-                locCount.set(locCount.get() + 1);
-            }
-        }
-    }
-
-    /**
-     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        try {
-            NCs = new String[ncNameToNcInfos.size()];
-            ipToNcMapping.clear();
-            ncNameToIndex.clear();
-            int i = 0;
-
-            /**
-             * build the IP address to NC map
-             */
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().lookupIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
-                if (matchedNCs == null) {
-                    matchedNCs = new ArrayList<String>();
-                    ipToNcMapping.put(ipAddr, matchedNCs);
-                }
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-
-            /**
-             * set up the NC name to index mapping
-             */
-            for (i = 0; i < NCs.length; i++) {
-                ncNameToIndex.put(NCs[i], i);
-            }
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java
deleted file mode 100644
index c8e9c65..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.indexing.input.RCFileLookupReader;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriter;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-public class RCFileControlledTupleParser implements IControlledTupleParser {
-
-    private ArrayTupleBuilder tb;
-    private transient DataOutput dos;
-    private final FrameTupleAppender appender;
-    private boolean propagateInput;
-    private int[] propagatedFields;
-    private FrameTupleReference frameTuple;
-    private IAsterixHDFSRecordParser parser;
-    private RCFileLookupReader reader;
-    private int[] ridFields;
-    private RecordDescriptor inRecDesc;
-    private FrameTupleAccessor tupleAccessor;
-    private ByteBufferInputStream bbis;
-    private DataInputStream dis;
-    private boolean retainNull;
-    protected byte nullByte;
-    protected ArrayTupleBuilder nullTupleBuild;
-
-    public RCFileControlledTupleParser(IHyracksTaskContext ctx, IAsterixHDFSRecordParser parser,
-            RCFileLookupReader reader, boolean propagateInput, int[] propagatedFields, RecordDescriptor inRecDesc,
-            int[] ridFields, boolean retainNull, INullWriterFactory iNullWriterFactory) throws HyracksDataException {
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        this.parser = parser;
-        this.reader = reader;
-        this.propagateInput = propagateInput;
-        this.propagatedFields = propagatedFields;
-        this.retainNull = retainNull;
-        this.inRecDesc = inRecDesc;
-        this.ridFields = ridFields;
-        this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
-        if (propagateInput) {
-            tb = new ArrayTupleBuilder(propagatedFields.length + 1);
-        } else {
-            tb = new ArrayTupleBuilder(1);
-        }
-        frameTuple = new FrameTupleReference();
-        dos = tb.getDataOutput();
-        bbis = new ByteBufferInputStream();
-        dis = new DataInputStream(bbis);
-        nullByte = ATypeTag.NULL.serialize();
-        if (retainNull) {
-            INullWriter nullWriter = iNullWriterFactory.createNullWriter();
-            nullTupleBuild = new ArrayTupleBuilder(1);
-            DataOutput out = nullTupleBuild.getDataOutput();
-            try {
-                nullWriter.writeNull(out);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        } else {
-            nullTupleBuild = null;
-        }
-    }
-
-    @Override
-    public void close(IFrameWriter writer) throws Exception {
-        try {
-            reader.close();
-            appender.flush(writer, true);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        }
-    }
-
-    @Override
-    public void parseNext(IFrameWriter writer, ByteBuffer frameBuffer) throws HyracksDataException {
-        try {
-            int tupleCount = 0;
-            int tupleIndex = 0;
-            Object object;
-            tupleAccessor.reset(frameBuffer);
-            tupleCount = tupleAccessor.getTupleCount();
-            int fieldSlotsLength = tupleAccessor.getFieldSlotsLength();
-            // Loop over tuples
-            while (tupleIndex < tupleCount) {
-                int tupleStartOffset = tupleAccessor.getTupleStartOffset(tupleIndex) + fieldSlotsLength;
-                int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(tupleIndex,
-                        ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]);
-                // Check if null <- for outer join ->
-                if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) {
-                    object = null;
-                } else {
-                    // Get file number
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
-                    int fileNumber = ((AInt32) inRecDesc
-                            .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
-                                    .getIntegerValue();
-                    // Get record group offset
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
-                            ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
-                    long recordOffset = ((AInt64) inRecDesc
-                            .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
-                                    .getLongValue();
-                    // Get row number
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
-                            ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]));
-                    int rowNumber = ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]]
-                            .deserialize(dis)).getIntegerValue();
-
-                    // Read record from external source
-                    object = reader.read(fileNumber, recordOffset, rowNumber);
-                }
-                if (object != null) {
-                    tb.reset();
-                    if (propagateInput) {
-                        frameTuple.reset(tupleAccessor, tupleIndex);
-                        for (int i = 0; i < propagatedFields.length; i++) {
-                            dos.write(frameTuple.getFieldData(propagatedFields[i]),
-                                    frameTuple.getFieldStart(propagatedFields[i]),
-                                    frameTuple.getFieldLength(propagatedFields[i]));
-                            tb.addFieldEndOffset();
-                        }
-                    }
-                    // parse record
-                    parser.parse(object, tb.getDataOutput());
-                    tb.addFieldEndOffset();
-                    addTupleToFrame(writer);
-                } else if (propagateInput && retainNull) {
-                    tb.reset();
-                    frameTuple.reset(tupleAccessor, tupleIndex);
-                    for (int i = 0; i < propagatedFields.length; i++) {
-                        dos.write(frameTuple.getFieldData(propagatedFields[i]),
-                                frameTuple.getFieldStart(propagatedFields[i]),
-                                frameTuple.getFieldLength(propagatedFields[i]));
-                        tb.addFieldEndOffset();
-                    }
-                    dos.write(nullTupleBuild.getByteArray());
-                    tb.addFieldEndOffset();
-                    addTupleToFrame(writer);
-                }
-                tupleIndex++;
-            }
-        } catch (Exception e) {
-            // Something went wrong, try to close the reader and then throw an exception <-this should never happen->
-            try {
-                reader.close();
-            } catch (Exception e1) {
-                e.addSuppressed(e1);
-            }
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileIndexingTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileIndexingTupleParser.java
deleted file mode 100644
index eaa3381..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileIndexingTupleParser.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class RCFileIndexingTupleParser extends AbstractIndexingTupleParser{
-
-    private Integer rowNumber = -1;
-    private Integer lastFileNumber = -1;
-    private long lastByteLocation = -1;
-
-    public RCFileIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser
-            deserializer)
-            throws HyracksDataException {
-        super(ctx, recType, deserializer);
-        tb = new ArrayTupleBuilder(4);
-        dos = tb.getDataOutput();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected void appendIndexingData(ArrayTupleBuilder tb,
-            AbstractHDFSReader inReader) throws Exception {
-        aMutableInt.setValue(inReader.getFileNumber());
-        aMutableLong.setValue(inReader.getReaderPosition());
-        //add file number
-        tb.addField(intSerde, aMutableInt);
-        //add record offset
-        tb.addField(longSerde, aMutableLong);
-        //add row number
-        if(aMutableInt.getIntegerValue().equals(lastFileNumber) && aMutableLong.getLongValue() == lastByteLocation){
-            rowNumber++;
-        }else{
-            lastFileNumber = aMutableInt.getIntegerValue();
-            lastByteLocation = aMutableLong.getLongValue();
-            rowNumber = 0;
-        }
-        aMutableInt.setValue(rowNumber);
-        tb.addField(intSerde, aMutableInt);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java
deleted file mode 100644
index 23ddd8a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.indexing.input.ILookupReader;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriter;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-public class SeqOrTxtControlledTupleParser implements IControlledTupleParser {
-
-    private ArrayTupleBuilder tb;
-    private transient DataOutput dos;
-    private final FrameTupleAppender appender;
-    private boolean propagateInput;
-    private int[] propagatedFields;
-    private FrameTupleReference frameTuple;
-    private IAsterixHDFSRecordParser parser;
-    private ILookupReader reader;
-    private int[] ridFields;
-    private RecordDescriptor inRecDesc;
-    private FrameTupleAccessor tupleAccessor;
-    private ByteBufferInputStream bbis;
-    private DataInputStream dis;
-    private boolean retainNull;
-    protected byte nullByte;
-    protected ArrayTupleBuilder nullTupleBuild;
-
-    public SeqOrTxtControlledTupleParser(IHyracksTaskContext ctx, IAsterixHDFSRecordParser parser, ILookupReader reader,
-            boolean propagateInput, int[] propagatedFields, RecordDescriptor inRecDesc, int[] ridFields,
-            boolean retainNull, INullWriterFactory iNullWriterFactory) throws HyracksDataException {
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        this.parser = parser;
-        this.reader = reader;
-        this.propagateInput = propagateInput;
-        this.ridFields = ridFields;
-        this.retainNull = retainNull;
-        if (propagateInput) {
-            tb = new ArrayTupleBuilder(propagatedFields.length + 1);
-            frameTuple = new FrameTupleReference();
-            this.propagatedFields = propagatedFields;
-        } else {
-            tb = new ArrayTupleBuilder(1);
-        }
-        dos = tb.getDataOutput();
-        this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
-        bbis = new ByteBufferInputStream();
-        dis = new DataInputStream(bbis);
-        nullByte = ATypeTag.NULL.serialize();
-        if (retainNull) {
-            INullWriter nullWriter = iNullWriterFactory.createNullWriter();
-            nullTupleBuild = new ArrayTupleBuilder(1);
-            DataOutput out = nullTupleBuild.getDataOutput();
-            try {
-                nullWriter.writeNull(out);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        } else {
-            nullTupleBuild = null;
-        }
-    }
-
-    @Override
-    public void close(IFrameWriter writer) throws Exception {
-        try {
-            reader.close();
-            appender.flush(writer, true);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        }
-    }
-
-    @Override
-    public void parseNext(IFrameWriter writer, ByteBuffer frameBuffer) throws HyracksDataException {
-        try {
-            int tupleCount = 0;
-            int tupleIndex = 0;
-            Object record;
-            tupleAccessor.reset(frameBuffer);
-            tupleCount = tupleAccessor.getTupleCount();
-            int fieldSlotsLength = tupleAccessor.getFieldSlotsLength();
-            // Loop over incoming tuples
-            while (tupleIndex < tupleCount) {
-                int tupleStartOffset = tupleAccessor.getTupleStartOffset(tupleIndex) + fieldSlotsLength;
-                int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(tupleIndex,
-                        ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]);
-                // Check if null <- for outer join ->
-                if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) {
-                    record = null;
-                } else {
-                    // Get file number
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
-                    int fileNumber = ((AInt32) inRecDesc
-                            .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
-                                    .getIntegerValue();
-                    // Get record offset
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
-                            ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
-                    long recordOffset = ((AInt64) inRecDesc
-                            .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
-                                    .getLongValue();
-                    // Read the record
-                    record = reader.read(fileNumber, recordOffset);
-                }
-                if (record != null) {
-                    tb.reset();
-                    if (propagateInput) {
-                        frameTuple.reset(tupleAccessor, tupleIndex);
-                        for (int i = 0; i < propagatedFields.length; i++) {
-                            dos.write(frameTuple.getFieldData(propagatedFields[i]),
-                                    frameTuple.getFieldStart(propagatedFields[i]),
-                                    frameTuple.getFieldLength(propagatedFields[i]));
-                            tb.addFieldEndOffset();
-                        }
-                    }
-                    // parse it
-                    parser.parse(record, tb.getDataOutput());
-                    tb.addFieldEndOffset();
-                    addTupleToFrame(writer);
-                } else if (propagateInput && retainNull) {
-                    tb.reset();
-                    frameTuple.reset(tupleAccessor, tupleIndex);
-                    for (int i = 0; i < propagatedFields.length; i++) {
-                        dos.write(frameTuple.getFieldData(propagatedFields[i]),
-                                frameTuple.getFieldStart(propagatedFields[i]),
-                                frameTuple.getFieldLength(propagatedFields[i]));
-                        tb.addFieldEndOffset();
-                    }
-                    dos.write(nullTupleBuild.getByteArray());
-                    tb.addFieldEndOffset();
-                    addTupleToFrame(writer);
-                }
-                tupleIndex++;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            try {
-                reader.close();
-            } catch (Exception e2) {
-                e.addSuppressed(e2);
-            }
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/TextOrSeqIndexingTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/TextOrSeqIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/TextOrSeqIndexingTupleParser.java
deleted file mode 100644
index d44b3f3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/TextOrSeqIndexingTupleParser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class TextOrSeqIndexingTupleParser extends AbstractIndexingTupleParser{
-    public TextOrSeqIndexingTupleParser(IHyracksCommonContext ctx,
-            ARecordType recType, IAsterixHDFSRecordParser deserializer)
-            throws HyracksDataException {
-        super(ctx, recType, deserializer);
-        tb = new ArrayTupleBuilder(3);
-        dos = tb.getDataOutput();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected void appendIndexingData(ArrayTupleBuilder tb,
-            AbstractHDFSReader inReader) throws Exception {
-        aMutableInt.setValue(inReader.getFileNumber());
-        aMutableLong.setValue(inReader.getReaderPosition());
-
-        tb.addField(intSerde, aMutableInt);
-        tb.addField(longSerde, aMutableLong);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
deleted file mode 100644
index 563a46d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-
-/*
- * This class is used for seek and read of external data of format adm or delimited text in sequence of text input format
- */
-public abstract class AbstractHDFSLookupInputStream extends InputStream {
-
-    protected String pendingValue = null;
-    protected FileSystem fs;
-    protected int fileNumber = -1;
-    protected int EOL = "\n".getBytes()[0];
-    protected boolean skipFile = false;
-    protected ExternalFile file = new ExternalFile(null, null, 0, null, null, 0, ExternalFilePendingOp.PENDING_NO_OP);
-    protected ExternalFileIndexAccessor filesIndexAccessor;
-
-    public AbstractHDFSLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf)
-            throws IOException {
-        this.filesIndexAccessor = filesIndexAccessor;
-        fs = FileSystem.get(conf);
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        if (pendingValue != null) {
-            int size = pendingValue.length() + 1;
-            if (size > len) {
-                return 0;
-            }
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset, pendingValue.length());
-            buffer[offset + pendingValue.length()] = (byte) EOL;
-            pendingValue = null;
-            return size;
-        }
-        return -1;
-    }
-
-    public boolean fetchRecord(int fileNumber, long recordOffset) throws Exception {
-        if (fileNumber != this.fileNumber) {
-            // New file number
-            this.fileNumber = fileNumber;
-            filesIndexAccessor.searchForFile(fileNumber, file);
-
-            try {
-                FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
-                if (fileStatus.getModificationTime() != file.getLastModefiedTime().getTime()) {
-                    this.fileNumber = fileNumber;
-                    skipFile = true;
-                    return false;
-                } else {
-                    this.fileNumber = fileNumber;
-                    skipFile = false;
-                    openFile(file.getFileName());
-                }
-            } catch (FileNotFoundException e) {
-                // We ignore File not found exceptions <- it means file was deleted and so we don't care about it anymore ->
-                this.fileNumber = fileNumber;
-                skipFile = true;
-                return false;
-            }
-        } else if (skipFile) {
-            return false;
-        }
-        return read(recordOffset);
-    }
-
-    @Override
-    public int read() throws IOException {
-        return 0;
-    }
-
-    protected abstract boolean read(long byteLocation);
-
-    protected abstract void openFile(String fileName) throws IOException;
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-    }
-
-    public ExternalFileIndexAccessor getExternalFileIndexAccessor() {
-        return filesIndexAccessor;
-    }
-
-    public void setExternalFileIndexAccessor(ExternalFileIndexAccessor filesIndexAccessor) {
-        this.filesIndexAccessor = filesIndexAccessor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java
deleted file mode 100644
index 65bfcf3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSReader.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.InputStream;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/***
- * an abstract class to be used for reading hdfs based datasets one record at a time <- used for indexing->
- */
-public abstract class AbstractHDFSReader extends InputStream {
-
-    /***
-     * This function should be called once to do initial setup before starting to read records
-     *
-     * @return true if ready for reading
-     */
-    abstract public boolean initialize() throws Exception;
-
-    /***
-     * @return the next object read or null if reached end of stream
-     */
-    abstract public Object readNext() throws Exception;
-
-    /**
-     * @return the file name of the current filesplit being read
-     * @throws Exception
-     *             in case of end of records is reached
-     */
-    abstract public String getFileName() throws Exception;
-
-    /**
-     * @return return the reader position of last record read
-     * @throws Exception
-     *             in case of end of records is reached
-     */
-    abstract public long getReaderPosition() throws Exception;
-
-    /**
-     * @return the file number of the file being read
-     * @throws Exception
-     */
-    abstract public int getFileNumber() throws Exception;
-
-    protected Reporter getReporter() {
-        Reporter reporter = new Reporter() {
-
-            @Override
-            public Counter getCounter(Enum<?> arg0) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String arg0, String arg1) {
-                return null;
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public void incrCounter(Enum<?> arg0, long arg1) {
-            }
-
-            @Override
-            public void incrCounter(String arg0, String arg1, long arg2) {
-            }
-
-            @Override
-            public void setStatus(String arg0) {
-            }
-
-            @Override
-            public void progress() {
-            }
-
-            @Override
-            public float getProgress() {
-                return 0.0f;
-            }
-        };
-
-        return reporter;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
deleted file mode 100644
index ba36407..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-/**
- * This is a generic reader used for indexing external dataset or for performing full scan for external dataset with
- * a stored snapshot
- *
- * @author alamouda
- */
-
-public class GenericFileAwareRecordReader extends GenericRecordReader {
-
-    private List<ExternalFile> files;
-    private FileSystem hadoopFS;
-    private long recordOffset = 0L;
-
-    public GenericFileAwareRecordReader(InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
-            boolean[] executed, List<ExternalFile> files) throws IOException {
-        super(inputSplits, readSchedule, nodeName, conf, executed);
-        this.files = files;
-        hadoopFS = FileSystem.get(conf);
-    }
-
-    private boolean moveToNext() throws IOException {
-        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-            /**
-             * read all the partitions scheduled to the current node
-             */
-            if (readSchedule[currentSplitIndex].equals(nodeName)) {
-                /**
-                 * pick an unread split to read synchronize among
-                 * simultaneous partitions in the same machine
-                 */
-                synchronized (executed) {
-                    if (executed[currentSplitIndex] == false) {
-                        executed[currentSplitIndex] = true;
-                    } else {
-                        continue;
-                    }
-                }
-
-                /**
-                 * read the split
-                 */
-                try {
-                    String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
-                    FileStatus fileStatus = hadoopFS.getFileStatus(new Path(fileName));
-                    //skip if not the same file stored in the files snapshot
-                    if (fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime()
-                            .getTime())
-                        continue;
-                    reader = getRecordReader(currentSplitIndex);
-                } catch (Exception e) {
-                    continue;
-                }
-                key = reader.createKey();
-                value = reader.createValue();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Object readNext() throws IOException {
-
-        if (reader == null) {
-            return null;
-        }
-        recordOffset = reader.getPos();
-        if (reader.next(key, value)) {
-            return value;
-        }
-        while (moveToNext()) {
-            recordOffset = reader.getPos();
-            if (reader.next(key, value)) {
-                return value;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public String getFileName() throws Exception {
-        return files.get(currentSplitIndex).getFileName();
-    }
-
-    @Override
-    public long getReaderPosition() throws Exception {
-        return recordOffset;
-    }
-
-    @Override
-    public int getFileNumber() throws Exception {
-        return files.get(currentSplitIndex).getFileNumber();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java
deleted file mode 100644
index ab050a7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericRecordReader.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-/**
- * This class can be used by any input format to perform full scan operations
- */
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class GenericRecordReader extends AbstractHDFSReader {
-
-    protected RecordReader reader;
-    protected Object key;
-    protected Object value;
-    protected int currentSplitIndex = 0;
-    protected boolean executed[];
-    protected InputSplit[] inputSplits;
-    protected String[] readSchedule;
-    protected String nodeName;
-    protected JobConf conf;
-
-    public GenericRecordReader(InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
-            boolean executed[]) {
-        this.inputSplits = inputSplits;
-        this.readSchedule = readSchedule;
-        this.nodeName = nodeName;
-        this.conf = conf;
-        this.executed = executed;
-    }
-
-    @Override
-    public boolean initialize() throws IOException {
-        return moveToNext();
-    }
-
-    private boolean moveToNext() throws IOException {
-        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-            /**
-             * read all the partitions scheduled to the current node
-             */
-            if (readSchedule[currentSplitIndex].equals(nodeName)) {
-                /**
-                 * pick an unread split to read synchronize among
-                 * simultaneous partitions in the same machine
-                 */
-                synchronized (executed) {
-                    if (executed[currentSplitIndex] == false) {
-                        executed[currentSplitIndex] = true;
-                    } else {
-                        continue;
-                    }
-                }
-
-                /**
-                 * read the split
-                 */
-                reader = getRecordReader(currentSplitIndex);
-                key = reader.createKey();
-                value = reader.createValue();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    protected RecordReader getRecordReader(int slitIndex) throws IOException {
-        RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[slitIndex], conf, getReporter());
-        return reader;
-    }
-
-    @Override
-    public Object readNext() throws IOException {
-        if (reader == null) {
-            return null;
-        }
-        if (reader.next(key, value)) {
-            return value;
-        }
-        while (moveToNext()) {
-            if (reader.next(key, value)) {
-                return value;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public String getFileName() throws Exception {
-        return null;
-    }
-
-    @Override
-    public long getReaderPosition() throws Exception {
-        return reader.getPos();
-    }
-
-    @Override
-    public int getFileNumber() throws Exception {
-        throw new NotImplementedException("This reader doesn't support this function");
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        throw new NotImplementedException("Use readNext()");
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use readNext()");
-    }
-
-}


Mime
View raw message