asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [16/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java
new file mode 100644
index 0000000..07d09db
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import org.apache.asterix.om.base.AInt32;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RCRecordIdReader extends RecordIdReader {
+
+    public RCRecordIdReader(int[] ridFields) {
+        super(ridFields);
+    }
+
+    @Override
+    public RecordId read(int index) throws HyracksDataException {
+        if (super.read(index) == null) {
+            return null;
+        }
+        // Get row number
+        bbis.setByteBuffer(frameBuffer, tupleStartOffset
+                + tupleAccessor.getFieldStartOffset(index, ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]));
+        rid.setRow(
+                ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]].deserialize(dis))
+                        .getIntegerValue());
+        return rid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
new file mode 100644
index 0000000..14235c0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class RecordColumnarIndexer implements IExternalIndexer {
+
+    private static final long serialVersionUID = 1L;
+    public static final int NUM_OF_FIELDS = 3;
+    protected AMutableInt32 fileNumber = new AMutableInt32(0);
+    protected AMutableInt64 offset = new AMutableInt64(0);
+    protected long nextOffset;
+    protected AMutableInt32 rowNumber = new AMutableInt32(0);
+    protected RecordReader<?, Writable> recordReader;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<IAObject> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<IAObject> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+
+    @Override
+    public void reset(IRecordReader<?> reader) throws IOException {
+        //TODO: Make this more generic. right now, it works because we only index hdfs files.
+        @SuppressWarnings("unchecked")
+        HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
+        fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
+        recordReader = hdfsReader.getReader();
+        offset.setValue(recordReader.getPos());
+        nextOffset = offset.getLongValue();
+        rowNumber.setValue(0);
+    }
+
+    @Override
+    public void index(ArrayTupleBuilder tb) throws IOException {
+        if (recordReader.getPos() != nextOffset) {
+            // start of a new group
+            offset.setValue(nextOffset);
+            nextOffset = recordReader.getPos();
+            rowNumber.setValue(0);
+        }
+        tb.addField(intSerde, fileNumber);
+        tb.addField(longSerde, offset);
+        tb.addField(intSerde, rowNumber);
+        rowNumber.setValue(rowNumber.getIntegerValue() + 1);
+    }
+
+    @Override
+    public int getNumberOfFields() {
+        return NUM_OF_FIELDS;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java
new file mode 100644
index 0000000..9027101
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+public class RecordId {
+    public static enum RecordIdType {
+        OFFSET,
+        RC
+    }
+
+    private int fileId;
+    private long offset;
+    private int row;
+
+    public int getFileId() {
+        return fileId;
+    }
+
+    public void setFileId(int fileId) {
+        this.fileId = fileId;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    public int getRow() {
+        return row;
+    }
+
+    public void setRow(int row) {
+        this.row = row;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java
new file mode 100644
index 0000000..2b4cc9c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class RecordIdReader {
+
+    private final static byte nullByte = ATypeTag.NULL.serialize();
+    protected FrameTupleAccessor tupleAccessor;
+    protected int fieldSlotsLength;
+    protected int[] ridFields;
+    protected RecordId rid;
+    protected RecordDescriptor inRecDesc;
+    protected ByteBufferInputStream bbis;
+    protected DataInputStream dis;
+    protected int tupleStartOffset;
+    protected ByteBuffer frameBuffer;
+
+    public RecordIdReader(int[] ridFields) {
+        this.ridFields = ridFields;
+        this.rid = new RecordId();
+    }
+
+    public void set(FrameTupleAccessor accessor, RecordDescriptor inRecDesc) {
+        this.tupleAccessor = accessor;
+        this.fieldSlotsLength = accessor.getFieldSlotsLength();
+        this.inRecDesc = inRecDesc;
+        this.bbis = new ByteBufferInputStream();
+        this.dis = new DataInputStream(bbis);
+    }
+
+    public RecordId read(int index) throws HyracksDataException {
+        tupleStartOffset = tupleAccessor.getTupleStartOffset(index) + fieldSlotsLength;
+        int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(index,
+                ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]);
+        frameBuffer = tupleAccessor.getBuffer();
+        if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) {
+            return null;
+        }
+        // Get file number
+        bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
+        rid.setFileId(
+                ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
+                        .getIntegerValue());
+        // Get record group offset
+        bbis.setByteBuffer(frameBuffer, tupleStartOffset
+                + tupleAccessor.getFieldStartOffset(index, ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
+        rid.setOffset(((AInt64) inRecDesc.getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]]
+                .deserialize(dis)).getLongValue());
+        return rid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java
new file mode 100644
index 0000000..d0bf2ff
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.HDFSUtils;
+
+public class RecordIdReaderFactory {
+
+    public static RecordIdReader create(Map<String, String> configuration, int[] ridFields) throws AsterixException {
+        switch (HDFSUtils.getRecordIdType(configuration)) {
+            case OFFSET:
+                return new RecordIdReader(ridFields);
+            case RC:
+                return new RCRecordIdReader(ridFields);
+            default:
+                throw new AsterixException("Unknown Record Id type: " + HDFSUtils.getRecordIdType(configuration));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java
deleted file mode 100644
index 07e09bd..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataOutput;
-import java.io.InputStream;
-
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-public abstract class AbstractIndexingTupleParser implements ITupleParser{
-
-    protected ArrayTupleBuilder tb;
-    protected DataOutput dos;
-    protected final FrameTupleAppender appender;
-    protected final ARecordType recType;
-    protected final IHyracksCommonContext ctx;
-    protected final IAsterixHDFSRecordParser deserializer;
-    protected final AMutableInt32 aMutableInt = new AMutableInt32(0);
-    protected final AMutableInt64 aMutableLong = new AMutableInt64(0);
-    
-    @SuppressWarnings("rawtypes")
-    protected final ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-    @SuppressWarnings("rawtypes")
-    protected final ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-    
-    public AbstractIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser
-            deserializer) throws HyracksDataException {
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        this.recType = recType;
-        this.ctx = ctx;
-        this.deserializer = deserializer;
-    }
-
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-        AbstractHDFSReader inReader = (AbstractHDFSReader) in;
-        Object record;
-        try {
-            inReader.initialize();
-            record = inReader.readNext();
-            while (record != null) {
-                tb.reset();
-                deserializer.parse(record, tb.getDataOutput());
-                tb.addFieldEndOffset();
-                //append indexing fields
-                appendIndexingData(tb, inReader);
-                addTupleToFrame(writer);
-                record = inReader.readNext();
-            }
-            appender.flush(writer, true);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected abstract void appendIndexingData(ArrayTupleBuilder tb,
-            AbstractHDFSReader inReader) throws Exception;
-
-    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException("Record is too big to fit in a frame");
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
deleted file mode 100644
index c94be6a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.indexing.input.AbstractHDFSLookupInputStream;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.runtime.operators.file.IDataParser;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriter;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-/**
- * class implementation for IControlledTupleParser. It provides common
- * functionality involved in parsing data in an external text format (adm or delimited text) in a pipelined manner and packing
- * frames with formed tuples.
- */
-public class AdmOrDelimitedControlledTupleParser implements IControlledTupleParser {
-
-    private ArrayTupleBuilder tb;
-    private transient DataOutput dos;
-    private final FrameTupleAppender appender;
-    protected final ARecordType recType;
-    private IDataParser parser;
-    private boolean propagateInput;
-    private int[] propagatedFields;
-    private int[] ridFields;
-    private RecordDescriptor inRecDesc;
-    private FrameTupleAccessor tupleAccessor;
-    private FrameTupleReference frameTuple;
-    private ByteBufferInputStream bbis;
-    private DataInputStream dis;
-    private AbstractHDFSLookupInputStream in;
-    private boolean parserInitialized = false;
-    private boolean retainNull;
-    protected byte nullByte;
-    protected ArrayTupleBuilder nullTupleBuild;
-
-    public AdmOrDelimitedControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType,
-            AbstractHDFSLookupInputStream in, boolean propagateInput, RecordDescriptor inRecDesc, IDataParser parser,
-            int[] propagatedFields, int[] ridFields, boolean retainNull, INullWriterFactory iNullWriterFactory)
-                    throws HyracksDataException {
-        this.recType = recType;
-        this.in = in;
-        this.propagateInput = propagateInput;
-        this.retainNull = retainNull;
-        this.inRecDesc = inRecDesc;
-        this.propagatedFields = propagatedFields;
-        this.ridFields = ridFields;
-        this.parser = parser;
-        this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        if (propagateInput) {
-            tb = new ArrayTupleBuilder(propagatedFields.length + 1);
-        } else {
-            tb = new ArrayTupleBuilder(1);
-        }
-        frameTuple = new FrameTupleReference();
-        dos = tb.getDataOutput();
-        bbis = new ByteBufferInputStream();
-        dis = new DataInputStream(bbis);
-        nullByte = ATypeTag.NULL.serialize();
-        if (retainNull) {
-            INullWriter nullWriter = iNullWriterFactory.createNullWriter();
-            nullTupleBuild = new ArrayTupleBuilder(1);
-            DataOutput out = nullTupleBuild.getDataOutput();
-            try {
-                nullWriter.writeNull(out);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        } else {
-            nullTupleBuild = null;
-        }
-    }
-
-    @Override
-    public void close(IFrameWriter writer) throws Exception {
-        try {
-            in.close();
-            appender.flush(writer, true);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void parseNext(IFrameWriter writer, ByteBuffer frameBuffer) throws HyracksDataException {
-        try {
-            int tupleCount = 0;
-            int tupleIndex = 0;
-            tupleAccessor.reset(frameBuffer);
-            tupleCount = tupleAccessor.getTupleCount();
-            int fieldSlotsLength = tupleAccessor.getFieldSlotsLength();
-            // Loop over tuples
-            while (tupleIndex < tupleCount) {
-                boolean found = false;
-                int tupleStartOffset = tupleAccessor.getTupleStartOffset(tupleIndex) + fieldSlotsLength;
-                int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(tupleIndex,
-                        ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]);
-                // Check if null <- for outer join ->
-                if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) {
-                } else {
-                    // Get file number
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
-                    int fileNumber = ((AInt32) inRecDesc
-                            .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
-                                    .getIntegerValue();
-                    // Get record offset
-                    bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
-                            ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
-                    long recordOffset = ((AInt64) inRecDesc
-                            .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
-                                    .getLongValue();
-                    found = in.fetchRecord(fileNumber, recordOffset);
-                }
-                if (found) {
-                    // Since we now know the inputStream is ready, we can safely initialize the parser
-                    // We can't do that earlier since the parser will start pulling from the stream and if it is not ready,
-                    // The parser will automatically release its resources
-                    if (!parserInitialized) {
-                        parser.initialize(in, recType, true);
-                        parserInitialized = true;
-                    }
-                    tb.reset();
-                    if (propagateInput) {
-                        frameTuple.reset(tupleAccessor, tupleIndex);
-                        for (int i = 0; i < propagatedFields.length; i++) {
-                            dos.write(frameTuple.getFieldData(propagatedFields[i]),
-                                    frameTuple.getFieldStart(propagatedFields[i]),
-                                    frameTuple.getFieldLength(propagatedFields[i]));
-                            tb.addFieldEndOffset();
-                        }
-                    }
-                    parser.parse(tb.getDataOutput());
-                    tb.addFieldEndOffset();
-                    addTupleToFrame(writer);
-                } else if (propagateInput && retainNull) {
-                    tb.reset();
-                    frameTuple.reset(tupleAccessor, tupleIndex);
-                    for (int i = 0; i < propagatedFields.length; i++) {
-                        dos.write(frameTuple.getFieldData(propagatedFields[i]),
-                                frameTuple.getFieldStart(propagatedFields[i]),
-                                frameTuple.getFieldLength(propagatedFields[i]));
-                        tb.addFieldEndOffset();
-                    }
-                    dos.write(nullTupleBuild.getByteArray());
-                    tb.addFieldEndOffset();
-                    addTupleToFrame(writer);
-                }
-                tupleIndex++;
-            }
-        } catch (Exception e) {
-            // un expected error, we try to close the inputstream and throw an exception
-            try {
-                in.close();
-            } catch (IOException e1) {
-                e1.printStackTrace();
-            }
-            throw new HyracksDataException(e);
-        }
-    }
-
-    // For debugging
-    public void prettyPrint(FrameTupleAccessor tupleAccessor, RecordDescriptor recDesc) {
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream dis = new DataInputStream(bbis);
-        int tc = tupleAccessor.getTupleCount();
-        System.err.println("TC: " + tc);
-        for (int i = 0; i < tc; ++i) {
-            System.err.print(
-                    i + ":(" + tupleAccessor.getTupleStartOffset(i) + ", " + tupleAccessor.getTupleEndOffset(i) + ")[");
-            for (int j = 0; j < tupleAccessor.getFieldCount(); ++j) {
-                System.err.print(j + ":(" + tupleAccessor.getFieldStartOffset(i, j) + ", "
-                        + tupleAccessor.getFieldEndOffset(i, j) + ") ");
-                System.err.print("{");
-                bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i)
-                        + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j));
-                try {
-                    byte tag = dis.readByte();
-                    if (tag == nullByte) {
-                        System.err.print("NULL");
-                    } else {
-                        bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i)
-                                + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j));
-                        System.err.print(recDesc.getFields()[j].deserialize(dis));
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-                System.err.print("}");
-            }
-            System.err.println("]");
-        }
-    }
-
-    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java
deleted file mode 100644
index 6abcbb8..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.InputStream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.runtime.operators.file.IDataParser;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-public class AdmOrDelimitedIndexingTupleParser implements ITupleParser {
-
-    private ArrayTupleBuilder tb;
-    private final FrameTupleAppender appender;
-    private final ARecordType recType;
-    private final IDataParser parser;
-    private final AMutableInt32 aMutableInt = new AMutableInt32(0);
-    private final AMutableInt64 aMutableLong = new AMutableInt64(0);
-
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-
-    public AdmOrDelimitedIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IDataParser parser)
-            throws HyracksDataException {
-        this.parser = parser;
-        this.recType = recType;
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        tb = new ArrayTupleBuilder(3);
-    }
-
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-        // Cast the input stream to a record reader
-        AbstractHDFSReader inReader = (AbstractHDFSReader) in;
-        try {
-            parser.initialize(in, recType, true);
-            while (true) {
-                tb.reset();
-                if (!parser.parse(tb.getDataOutput())) {
-                    break;
-                }
-                tb.addFieldEndOffset();
-                appendIndexingData(tb, inReader);
-                addTupleToFrame(writer);
-            }
-            appender.flush(writer, true);
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (Exception ioe) {
-            throw new HyracksDataException(ioe);
-        }
-    }
-
-    // This function is used to append RID to Hyracks tuple
-    @SuppressWarnings("unchecked")
-    private void appendIndexingData(ArrayTupleBuilder tb, AbstractHDFSReader inReader) throws Exception {
-        aMutableInt.setValue(inReader.getFileNumber());
-        aMutableLong.setValue(inReader.getReaderPosition());
-        tb.addField(intSerde, aMutableInt);
-        tb.addField(longSerde, aMutableLong);
-    }
-
-    private void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException("Record is too big to fit in a frame");
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
deleted file mode 100644
index 9271ebe..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AMutableDateTime;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-@SuppressWarnings("unchecked")
-public class FileIndexTupleTranslator {
-    private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
-    private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
-            filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount());
-    private RecordBuilder recordBuilder = new RecordBuilder();
-    private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableString aString = new AMutableString(null);
-    private AMutableDateTime aDateTime = new AMutableDateTime(0);
-    private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    private ISerializerDeserializer<ADateTime> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADATETIME);
-    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    private ArrayTupleReference tuple = new ArrayTupleReference();
-
-    public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException {
-        tupleBuilder.reset();
-        //File Number
-        aInt32.setValue(file.getFileNumber());
-        filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32,
-                tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-
-        //File Record
-        recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE);
-        // write field 0 (File Name)
-        fieldValue.reset();
-        aString.setValue(file.getFileName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(0, fieldValue);
-
-        //write field 1 (File Size)
-        fieldValue.reset();
-        aInt64.setValue(file.getSize());
-        longSerde.serialize(aInt64, fieldValue.getDataOutput());
-        recordBuilder.addField(1, fieldValue);
-
-        //write field 2 (File Mod Date)
-        fieldValue.reset();
-        aDateTime.setValue(file.getLastModefiedTime().getTime());
-        dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
-        recordBuilder.addField(2, fieldValue);
-
-        //write the record
-        recordBuilder.write(tupleBuilder.getDataOutput(), true);
-        tupleBuilder.addFieldEndOffset();
-        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-        return tuple;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
deleted file mode 100644
index b38b835..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * This is the parser factory for parsers used to do indexing
- */
-public class HDFSIndexingParserFactory implements ITupleParserFactory {
-
-    private static final long serialVersionUID = 1L;
-    // file input-format <text, seq, rc>
-    private final String inputFormat;
-    // content format <adm, delimited-text, binary>
-    private final String format;
-    // delimiter in case of delimited text
-    private final char delimiter;
-    // quote in case of delimited text
-    private final char quote;
-    // parser class name in case of binary format
-    private final String parserClassName;
-    // the expected data type
-    private final ARecordType atype;
-    // the hadoop job conf
-    private transient JobConf jobConf;
-    // adapter arguments
-    private Map<String, String> arguments;
-
-    public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter, char quote,
-            String parserClassName) {
-        this.inputFormat = inputFormat;
-        this.format = format;
-        this.parserClassName = parserClassName;
-        this.delimiter = delimiter;
-        this.quote = quote;
-        this.atype = atype;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException {
-        if (format == null) {
-            throw new IllegalArgumentException("Unspecified data format");
-        }
-        if (inputFormat == null) {
-            throw new IllegalArgumentException("Unspecified data format");
-        }
-        if (!inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_RC)
-                && !inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_TEXT)
-                && !inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE)) {
-            throw new IllegalArgumentException("External Indexing not supportd for format " + inputFormat);
-        }
-        // Do some real work here
-        /*
-         * Choices are:
-         * 1. TxtOrSeq (Object) indexing tuple parser
-         * 2. RC indexing tuple parser
-         * 3. textual data tuple parser
-         */
-        if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_ADM)) {
-            // choice 3 with adm data parser
-            ADMDataParser dataParser = new ADMDataParser();
-            return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
-        } else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
-            // choice 3 with delimited data parser
-            DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype, delimiter, quote);
-            return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
-        }
-
-        // binary data with a special parser --> create the parser
-        IAsterixHDFSRecordParser objectParser;
-        if (parserClassName.equalsIgnoreCase(HDFSAdapterFactory.PARSER_HIVE)) {
-            objectParser = new HiveObjectParser();
-        } else {
-            try {
-                objectParser = (IAsterixHDFSRecordParser) Class.forName(parserClassName).newInstance();
-            } catch (Exception e) {
-                throw new HyracksDataException("Unable to create object parser", e);
-            }
-        }
-        try {
-            objectParser.initialize(atype, arguments, jobConf);
-        } catch (Exception e) {
-            throw new HyracksDataException("Unable to initialize object parser", e);
-        }
-
-        if (inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
-            // Case 2
-            return new RCFileIndexingTupleParser(ctx, atype, objectParser);
-        } else {
-            // Case 1
-            return new TextOrSeqIndexingTupleParser(ctx, atype, objectParser);
-        }
-    }
-
-    public JobConf getJobConf() {
-        return jobConf;
-    }
-
-    public void setJobConf(JobConf jobConf) {
-        this.jobConf = jobConf;
-    }
-
-    public Map<String, String> getArguments() {
-        return arguments;
-    }
-
-    public void setArguments(Map<String, String> arguments) {
-        this.arguments = arguments;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
deleted file mode 100644
index d9ce7aa..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.input.RCFileLookupReader;
-import org.apache.asterix.external.indexing.input.SequenceFileLookupInputStream;
-import org.apache.asterix.external.indexing.input.SequenceFileLookupReader;
-import org.apache.asterix.external.indexing.input.TextFileLookupInputStream;
-import org.apache.asterix.external.indexing.input.TextFileLookupReader;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class HDFSLookupAdapter implements IControlledAdapter, Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private RecordDescriptor inRecDesc;
-    private boolean propagateInput;
-    private int[] ridFields;
-    private int[] propagatedFields;
-    private IAType atype;
-    private Map<String, String> configuration;
-    private IHyracksTaskContext ctx;
-    private IControlledTupleParser parser;
-    private ExternalFileIndexAccessor fileIndexAccessor;
-    private boolean retainNull;
-
-    public HDFSLookupAdapter(IAType atype, RecordDescriptor inRecDesc, Map<String, String> adapterConfiguration,
-            boolean propagateInput, int[] ridFields, int[] propagatedFields, IHyracksTaskContext ctx,
-            ExternalFileIndexAccessor fileIndexAccessor, boolean retainNull) {
-        this.configuration = adapterConfiguration;
-        this.atype = atype;
-        this.ctx = ctx;
-        this.inRecDesc = inRecDesc;
-        this.propagatedFields = propagatedFields;
-        this.propagateInput = propagateInput;
-        this.propagatedFields = propagatedFields;
-        this.fileIndexAccessor = fileIndexAccessor;
-        this.ridFields = ridFields;
-        this.retainNull = retainNull;
-    }
-
-    /*
-     * This function is not easy to read and could be refactored into a better structure but for now it works
-     */
-    @Override
-    public void initialize(IHyracksTaskContext ctx, INullWriterFactory iNullWriterFactory) throws Exception {
-        JobConf jobConf = HDFSAdapterFactory.configureJobConf(configuration);
-        // Create the lookup reader and the controlled parser
-        if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
-            configureRCFile(jobConf, iNullWriterFactory);
-        } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
-                .equals(AsterixTupleParserFactory.FORMAT_ADM)) {
-            // create an adm parser
-            ADMDataParser dataParser = new ADMDataParser();
-            if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
-                // Text input format
-                TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf);
-                parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
-                        inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
-            } else {
-                // Sequence input format
-                SequenceFileLookupInputStream in = new SequenceFileLookupInputStream(fileIndexAccessor, jobConf);
-                parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
-                        inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
-            }
-        } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
-                .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
-            // create a delimited text parser
-            char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
-            char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
-
-            DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
-                    delimiter, quote);
-            if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
-                // Text input format
-                TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf);
-                parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
-                        inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
-            } else {
-                // Sequence input format
-                SequenceFileLookupInputStream in = new SequenceFileLookupInputStream(fileIndexAccessor, jobConf);
-                parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
-                        inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
-            }
-        } else {
-            configureGenericSeqOrText(jobConf, iNullWriterFactory);
-        }
-    }
-
-    private void configureGenericSeqOrText(JobConf jobConf, INullWriterFactory iNullWriterFactory) throws IOException {
-        if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
-            // Text input format
-            TextFileLookupReader reader = new TextFileLookupReader(fileIndexAccessor, jobConf);
-            parser = new SeqOrTxtControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput,
-                    propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory);
-        } else {
-            // Sequence input format
-            SequenceFileLookupReader reader = new SequenceFileLookupReader(fileIndexAccessor, jobConf);
-            parser = new SeqOrTxtControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput,
-                    propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory);
-        }
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer, IFrameWriter writer) throws Exception {
-        parser.parseNext(writer, buffer);
-    }
-
-    @Override
-    public void close(IFrameWriter writer) throws Exception {
-        parser.close(writer);
-    }
-
-    @Override
-    public void fail() throws Exception {
-        // Do nothing
-    }
-
-    private void configureRCFile(Configuration jobConf, INullWriterFactory iNullWriterFactory)
-            throws IOException, Exception {
-        // RCFileLookupReader
-        RCFileLookupReader reader = new RCFileLookupReader(fileIndexAccessor,
-                HDFSAdapterFactory.configureJobConf(configuration));
-        parser = new RCFileControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput,
-                propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory);
-    }
-
-    private IAsterixHDFSRecordParser createRecordParser(Configuration jobConf) throws HyracksDataException {
-        // Create the record parser
-        // binary data with a special parser --> create the parser
-        IAsterixHDFSRecordParser objectParser;
-        if (configuration.get(HDFSAdapterFactory.KEY_PARSER).equals(HDFSAdapterFactory.PARSER_HIVE)) {
-            objectParser = new HiveObjectParser();
-        } else {
-            try {
-                objectParser = (IAsterixHDFSRecordParser) Class
-                        .forName(configuration.get(HDFSAdapterFactory.KEY_PARSER)).newInstance();
-            } catch (Exception e) {
-                throw new HyracksDataException("Unable to create object parser", e);
-            }
-        }
-        // initialize the parser
-        try {
-            objectParser.initialize((ARecordType) atype, configuration, jobConf);
-        } catch (Exception e) {
-            throw new HyracksDataException("Unable to initialize object parser", e);
-        }
-
-        return objectParser;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
deleted file mode 100644
index fab507d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-// This class takes care of creating the adapter based on the formats and input format
-public class HDFSLookupAdapterFactory implements IControlledAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private Map<String, String> adapterConfiguration;
-    private IAType atype;
-    private boolean propagateInput;
-    private int[] ridFields;
-    private int[] propagatedFields;
-    private boolean retainNull;
-
-    @Override
-    public void configure(IAType atype, boolean propagateInput, int[] ridFields,
-            Map<String, String> adapterConfiguration, boolean retainNull) {
-        this.adapterConfiguration = adapterConfiguration;
-        this.atype = atype;
-        this.propagateInput = propagateInput;
-        this.ridFields = ridFields;
-        this.retainNull = retainNull;
-    }
-
-    @Override
-    public IControlledAdapter createAdapter(IHyracksTaskContext ctx, ExternalFileIndexAccessor fileIndexAccessor,
-            RecordDescriptor inRecDesc) {
-        if (propagateInput) {
-            configurePropagatedFields(inRecDesc);
-        }
-        return new HDFSLookupAdapter(atype, inRecDesc, adapterConfiguration, propagateInput, ridFields,
-                propagatedFields, ctx, fileIndexAccessor, retainNull);
-    }
-
-    private void configurePropagatedFields(RecordDescriptor inRecDesc) {
-        int ptr = 0;
-        boolean skip = false;
-        propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length];
-        for (int i = 0; i < inRecDesc.getFieldCount(); i++) {
-            if (ptr < ridFields.length) {
-                skip = false;
-                for (int j = 0; j < ridFields.length; j++) {
-                    if (ridFields[j] == i) {
-                        ptr++;
-                        skip = true;
-                        break;
-                    }
-                }
-                if (!skip)
-                    propagatedFields[i - ptr] = i;
-            } else {
-                propagatedFields[i - ptr] = i;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java
deleted file mode 100644
index f42a6d1..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.InputStream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-/*
- * This class is used with hdfs objects instead of hdfs
- */
-public class HDFSObjectTupleParser implements ITupleParser{
-
-    private ArrayTupleBuilder tb;
-    private final FrameTupleAppender appender;
-    private IAsterixHDFSRecordParser deserializer;
-
-    public HDFSObjectTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser deserializer)
-            throws HyracksDataException {
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        this.deserializer = deserializer;
-        tb = new ArrayTupleBuilder(1);
-    }
-
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-        AbstractHDFSReader reader = (AbstractHDFSReader) in;
-        Object object;
-        try {
-            reader.initialize();
-            object = reader.readNext();
-            while (object!= null) {
-                tb.reset();
-                deserializer.parse(object, tb.getDataOutput());
-                tb.addFieldEndOffset();
-                addTupleToFrame(writer);
-                object = reader.readNext();
-            }
-            appender.flush(writer, true);
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java
deleted file mode 100644
index ac3a92f..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class HDFSObjectTupleParserFactory implements ITupleParserFactory{
-    private static final long serialVersionUID = 1L;
-    // parser class name in case of binary format
-    private String parserClassName;
-    // the expected data type
-    private ARecordType atype;
-    // the hadoop job conf
-    private HDFSAdapterFactory adapterFactory;
-    // adapter arguments
-    private Map<String,String> arguments;
-
-    public HDFSObjectTupleParserFactory(ARecordType atype, HDFSAdapterFactory adapterFactory, Map<String,String> arguments){
-        this.parserClassName = (String) arguments.get(HDFSAdapterFactory.KEY_PARSER);
-        this.atype = atype;
-        this.arguments = arguments;
-        this.adapterFactory = adapterFactory;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException {
-        IAsterixHDFSRecordParser objectParser;
-        if (parserClassName.equals(HDFSAdapterFactory.PARSER_HIVE)) {
-            objectParser = new HiveObjectParser();
-        } else {
-            try {
-                objectParser = (IAsterixHDFSRecordParser) Class.forName(parserClassName).newInstance();
-            } catch (Exception e) {
-                throw new HyracksDataException("Unable to create object parser", e);
-            }
-        }
-        try {
-            objectParser.initialize(atype, arguments, adapterFactory.getJobConf());
-        } catch (Exception e) {
-            throw new HyracksDataException("Unable to initialize object parser", e);
-        }
-
-        return new HDFSObjectTupleParser(ctx, atype, objectParser);
-    }
-
-}


Mime
View raw message