asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [13/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
deleted file mode 100644
index ca2e7ca..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelper;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-
-/*
- * This operator is intended for using record ids to access data in external sources
- */
-public class ExternalLoopkupOperatorDiscriptor extends AbstractTreeIndexOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-    private final IControlledAdapterFactory adapterFactory;
-    private final INullWriterFactory iNullWriterFactory;
-
-    public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec, IControlledAdapterFactory adapterFactory,
-            RecordDescriptor outRecDesc, ExternalBTreeDataflowHelperFactory externalFilesIndexDataFlowHelperFactory,
-            boolean propagateInput, IIndexLifecycleManagerProvider lcManagerProvider,
-            IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider, int datasetId,
-            double bloomFilterFalsePositiveRate, ISearchOperationCallbackFactory searchOpCallbackFactory,
-            boolean retainNull, INullWriterFactory iNullWriterFactory) {
-        super(spec, 1, 1, outRecDesc, storageManager, lcManagerProvider, fileSplitProvider,
-                new FilesIndexDescription().EXTERNAL_FILE_INDEX_TYPE_TRAITS,
-                new FilesIndexDescription().FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS,
-                externalFilesIndexDataFlowHelperFactory, null, propagateInput, retainNull, iNullWriterFactory, null,
-                searchOpCallbackFactory, null);
-        this.adapterFactory = adapterFactory;
-        this.iNullWriterFactory = iNullWriterFactory;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                    throws HyracksDataException {
-        // Create a file index accessor to be used for files lookup operations
-        // Note that all file index accessors will use partition 0 since we only have 1 files index per NC 
-        final ExternalFileIndexAccessor fileIndexAccessor = new ExternalFileIndexAccessor(
-                (ExternalBTreeDataflowHelper) dataflowHelperFactory.createIndexDataflowHelper(this, ctx, partition),
-                this);
-        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-            // The adapter that uses the file index along with the coming tuples to access files in HDFS
-            private final IControlledAdapter adapter = adapterFactory.createAdapter(ctx, fileIndexAccessor,
-                    recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
-            private boolean indexOpen = false;
-            private boolean writerOpen = false;
-
-            @Override
-            public void open() throws HyracksDataException {
-                //Open the file index accessor here
-                fileIndexAccessor.openIndex();
-                indexOpen = true;
-                try {
-                    adapter.initialize(ctx, iNullWriterFactory);
-                } catch (Throwable th) {
-                    // close the files index
-                    fileIndexAccessor.closeIndex();
-                    throw new HyracksDataException(th);
-                }
-                writerOpen = true;
-                writer.open();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                try {
-                    adapter.close(writer);
-                } catch (Throwable th) {
-                    throw new HyracksDataException(th);
-                } finally {
-                    try {
-                        if (indexOpen) {
-                            //close the file index
-                            fileIndexAccessor.closeIndex();
-                        }
-                    } finally {
-                        if (writerOpen) {
-                            writer.close();
-                        }
-                    }
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                try {
-                    adapter.fail();
-                } catch (Throwable th) {
-                    throw new HyracksDataException(th);
-                } finally {
-                    if (writerOpen) {
-                        writer.fail();
-                    }
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                try {
-                    adapter.nextFrame(buffer, writer);
-                } catch (Throwable th) {
-                    throw new HyracksDataException(th);
-                }
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java
deleted file mode 100644
index 6f367d2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/IndexInfoOperatorDescriptor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-
-/*
- * This is a hack used to optain multiple index instances in a single operator and it is not actually used as an operator
- */
-public class IndexInfoOperatorDescriptor implements IIndexOperatorDescriptor{
-
-    private static final long serialVersionUID = 1L;
-    private final IFileSplitProvider fileSplitProvider;
-    private final IStorageManagerInterface storageManager;
-    private final IIndexLifecycleManagerProvider lifecycleManagerProvider;
-    public IndexInfoOperatorDescriptor(IFileSplitProvider fileSplitProvider,IStorageManagerInterface storageManager,
-            IIndexLifecycleManagerProvider lifecycleManagerProvider){
-        this.fileSplitProvider = fileSplitProvider;
-        this.lifecycleManagerProvider = lifecycleManagerProvider;
-        this.storageManager = storageManager;
-        
-    }
-
-    @Override
-    public ActivityId getActivityId() {
-        return null;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return null;
-    }
-
-    @Override
-    public IFileSplitProvider getFileSplitProvider() {
-        return fileSplitProvider;
-    }
-
-    @Override
-    public IStorageManagerInterface getStorageManager() {
-        return storageManager;
-    }
-
-    @Override
-    public IIndexLifecycleManagerProvider getLifecycleManagerProvider() {
-        return lifecycleManagerProvider;
-    }
-
-    @Override
-    public RecordDescriptor getRecordDescriptor() {
-        return null;
-    }
-
-    @Override
-    public IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
-        return null;
-    }
-
-    @Override
-    public boolean getRetainInput() {
-        return false;
-    }
-
-    @Override
-    public ISearchOperationCallbackFactory getSearchOpCallbackFactory() {
-        return null;
-    }
-
-    @Override
-    public IModificationOperationCallbackFactory getModificationOpCallbackFactory() {
-        return null;
-    }
-
-    @Override
-    public ITupleFilterFactory getTupleFilterFactory() {
-        return null;
-    }
-
-    @Override
-    public ILocalResourceFactoryProvider getLocalResourceFactoryProvider() {
-        return null;
-    }
-
-    @Override
-    public boolean getRetainNull() {
-        return false;
-    }
-
-    @Override
-    public INullWriterFactory getNullWriterFactory() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
new file mode 100644
index 0000000..7e9fdcb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingScheduler;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
+import org.apache.hyracks.hdfs.scheduler.Scheduler;
+
+public class HDFSDataSourceFactory
+        implements IInputStreamProviderFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
+
+    protected static final long serialVersionUID = 1L;
+    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected String[] readSchedule;
+    protected boolean read[];
+    protected InputSplitsFactory inputSplitsFactory;
+    protected ConfFactory confFactory;
+    protected boolean configured = false;
+    protected static Scheduler hdfsScheduler;
+    protected static IndexingScheduler indexingScheduler;
+    protected static Boolean initialized = false;
+    protected List<ExternalFile> files;
+    protected Map<String, String> configuration;
+    protected Class<?> recordClass;
+    protected boolean indexingOp = false;
+    private JobConf conf;
+    private InputSplit[] inputSplits;
+    private String nodeName;
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        if (!HDFSDataSourceFactory.initialized) {
+            HDFSDataSourceFactory.initialize();
+        }
+        this.configuration = configuration;
+        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+        confFactory = new ConfFactory(conf);
+        clusterLocations = getPartitionConstraint();
+        int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+        // if files list was set, we restrict the splits to the list
+        InputSplit[] inputSplits;
+        if (files == null) {
+            inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+        } else {
+            inputSplits = HDFSUtils.getSplits(conf, files);
+        }
+        if (indexingOp) {
+            readSchedule = indexingScheduler.getLocationConstraints(inputSplits);
+        } else {
+            readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
+        }
+        inputSplitsFactory = new InputSplitsFactory(inputSplits);
+        read = new boolean[readSchedule.length];
+        Arrays.fill(read, false);
+        if (!ExternalDataUtils.isDataSourceStreamProvider(configuration)) {
+            RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
+            this.recordClass = reader.createValue().getClass();
+            reader.close();
+        }
+    }
+
+    // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
+    @Override
+    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
+        this.files = files;
+        this.indexingOp = indexingOp;
+    }
+
+    /*
+     * The method below was modified to take care of the following
+     * 1. when target files are not null, it generates a file aware input stream that validate against the files
+     * 2. if the data is binary, it returns a generic reader
+     */
+    @Override
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            if (!configured) {
+                conf = confFactory.getConf();
+                inputSplits = inputSplitsFactory.getSplits();
+                nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+                configured = true;
+            }
+            return new HDFSInputStreamProvider<Object>(read, inputSplits, readSchedule, nodeName, conf, configuration,
+                    files);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    /**
+     * Get the cluster locations for this input stream factory. This method specifies on which asterix nodes the
+     * external
+     * adapter will run and how many threads per node.
+     * @return
+     */
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() {
+        clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+        return clusterLocations;
+    }
+
+    /**
+     * This method initialize the scheduler which assigns responsibility of reading different logical input splits from
+     * HDFS
+     */
+    private static void initialize() {
+        synchronized (initialized) {
+            if (!initialized) {
+                hdfsScheduler = HDFSUtils.initializeHDFSScheduler();
+                indexingScheduler = HDFSUtils.initializeIndexingHDFSScheduler();
+                initialized = true;
+            }
+        }
+    }
+
+    public JobConf getJobConf() throws HyracksDataException {
+        return confFactory.getConf();
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return (ExternalDataUtils.isDataSourceStreamProvider(configuration)) ? DataSourceType.STREAM
+                : DataSourceType.RECORDS;
+    }
+
+    @Override
+    public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws Exception {
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        HDFSRecordReader<Object, Writable> recordReader = new HDFSRecordReader<Object, Writable>(read, inputSplits,
+                readSchedule, nodeName, conf);
+        if (files != null) {
+            recordReader.setSnapshot(files);
+            recordReader.setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+        }
+        recordReader.configure(configuration);
+        return recordReader;
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return recordClass;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return true;
+    }
+
+    @Override
+    public boolean isIndexingOp() {
+        return (files != null && indexingOp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
new file mode 100644
index 0000000..fd5c397
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.util.Arrays;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public class CharArrayRecord implements IRawRecord<char[]> {
+
+    private char[] value;
+    private int size;
+
+    @Override
+    public byte[] getBytes() {
+        return new String(value).getBytes();
+    }
+
+    @Override
+    public char[] get() {
+        return value;
+    }
+
+    @Override
+    public int size() {
+        return size;
+    }
+
+    public CharArrayRecord(int initialCapacity) {
+        value = new char[initialCapacity];
+        size = 0;
+    }
+
+    public CharArrayRecord() {
+        value = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+        size = 0;
+    }
+
+    public void setValue(char[] recordBuffer, int offset, int length) {
+        if (value.length < length) {
+            value = new char[length];
+        }
+        System.arraycopy(recordBuffer, offset, value, 0, length);
+        size = length;
+    }
+
+    private void ensureCapacity(int len) {
+        if (value.length < len) {
+            value = Arrays.copyOf(value, (int) (len * 1.25));
+        }
+    }
+
+    public void append(char[] recordBuffer, int offset, int length) {
+        ensureCapacity(size + length);
+        System.arraycopy(recordBuffer, offset, value, size, length);
+        size += length;
+    }
+
+    @Override
+    public void reset() {
+        size = 0;
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(value, 0, size);
+    }
+
+    public void setValue(char[] value) {
+        this.value = value;
+    }
+
+    public void endRecord() {
+        if (value[size - 1] != ExternalDataConstants.LF) {
+            appendChar(ExternalDataConstants.LF);
+        }
+    }
+
+    private void appendChar(char c) {
+        ensureCapacity(size + 1);
+        value[size] = c;
+        size++;
+    }
+
+    @Override
+    public Class<char[]> getRecordClass() {
+        return char[].class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
new file mode 100644
index 0000000..365bc22
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import org.apache.asterix.external.api.IRawRecord;
+
+public class GenericRecord<T> implements IRawRecord<T> {
+
+    private T record;
+
+    public GenericRecord() {
+    }
+
+    public GenericRecord(T record) {
+        this.record = record;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return null;
+    }
+
+    @Override
+    public T get() {
+        return record;
+    }
+
+    @Override
+    public int size() {
+        return -1;
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return record.getClass();
+    }
+
+    public void set(T record) {
+        this.record = record;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
new file mode 100644
index 0000000..1b84e7a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+public abstract class AbstractCharRecordLookupReader extends AbstractHDFSLookupRecordReader<char[]> {
+    public AbstractCharRecordLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+            Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    protected CharArrayRecord record = new CharArrayRecord();
+    protected Text value = new Text();
+    protected CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    protected ByteBuffer reusableByteBuffer = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    protected CharBuffer reusableCharBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+
+    @Override
+    public Class<?> getRecordClass() throws IOException {
+        return char[].class;
+    }
+
+    @Override
+    protected IRawRecord<char[]> lookup(RecordId rid) throws IOException {
+        record.reset();
+        readRecord(rid);
+        writeRecord();
+        return record;
+    }
+
+    protected abstract void readRecord(RecordId rid) throws IOException;
+
+    private void writeRecord() {
+        reusableByteBuffer.clear();
+        if (reusableByteBuffer.remaining() < value.getLength()) {
+            reusableByteBuffer = ByteBuffer
+                    .allocateDirect(value.getLength() + ExternalDataConstants.DEFAULT_BUFFER_INCREMENT);
+        }
+        reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
+        reusableByteBuffer.flip();
+        while (reusableByteBuffer.hasRemaining()) {
+            decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
+            record.append(reusableCharBuffer.array(), 0, reusableCharBuffer.position());
+            reusableCharBuffer.clear();
+        }
+        record.endRecord();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
new file mode 100644
index 0000000..5a20962
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractHDFSLookupRecordReader<T> implements ILookupRecordReader<T> {
+
+    protected int fileId;
+    private ExternalFileIndexAccessor snapshotAccessor;
+    protected ExternalFile file;
+    protected FileSystem fs;
+    protected Configuration conf;
+    protected boolean replaced;
+
+    public AbstractHDFSLookupRecordReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+            Configuration conf) {
+        this.snapshotAccessor = snapshotAccessor;
+        this.fs = fs;
+        this.conf = conf;
+        this.fileId = -1;
+        this.file = new ExternalFile();
+    }
+
+    @Override
+    public void configure(Map<String, String> configurations) throws Exception {
+    }
+
+    @Override
+    public IRawRecord<T> read(RecordId rid) throws Exception {
+        if (rid.getFileId() != fileId) {
+            // close current file
+            closeFile();
+            // lookup new file
+            snapshotAccessor.lookup(rid.getFileId(), file);
+            fileId = rid.getFileId();
+            try {
+                validate();
+                if (!replaced) {
+                    openFile();
+                    validate();
+                    if (replaced) {
+                        closeFile();
+                    }
+                }
+            } catch (FileNotFoundException e) {
+                replaced = true;
+            }
+        }
+        if (replaced) {
+            return null;
+        }
+        return lookup(rid);
+    }
+
+    protected abstract IRawRecord<T> lookup(RecordId rid) throws IOException;
+
+    private void validate() throws IllegalArgumentException, IOException {
+        FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
+        replaced = fileStatus.getModificationTime() != file.getLastModefiedTime().getTime();
+    }
+
+    protected abstract void closeFile();
+
+    protected abstract void openFile() throws IllegalArgumentException, IOException;
+
+    @Override
+    public final void open() throws HyracksDataException {
+        snapshotAccessor.open();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            closeFile();
+        } finally {
+            snapshotAccessor.close();
+        }
+    }
+
+    @Override
+    public void fail() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
new file mode 100644
index 0000000..3b59b98
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.AInputStreamReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+    protected AInputStreamReader reader;
+    protected CharArrayRecord record;
+    protected char[] inputBuffer;
+    protected int bufferLength = 0;
+    protected int bufferPosn = 0;
+    protected IExternalIndexer indexer;
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException {
+        return record;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    public void setInputStream(AInputStream inputStream) throws IOException {
+        this.reader = new AInputStreamReader(inputStream);
+    }
+
+    @Override
+    public Class<char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        record = new CharArrayRecord();
+        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public void setIndexer(IExternalIndexer indexer) {
+        this.indexer = indexer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
new file mode 100644
index 0000000..c7acb1a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractStreamRecordReaderFactory<T>
+        implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
+
+    private static final long serialVersionUID = 1L;
+    protected IInputStreamProviderFactory inputStreamFactory;
+    protected Map<String, String> configuration;
+
+    public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
+            IInputStreamProviderFactory inputStreamFactory) {
+        this.inputStreamFactory = inputStreamFactory;
+        return this;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return inputStreamFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        inputStreamFactory.configure(configuration);
+        configureStreamReaderFactory(configuration);
+    }
+
+    protected abstract void configureStreamReaderFactory(Map<String, String> configuration) throws Exception;
+
+    @Override
+    public boolean isIndexible() {
+        return inputStreamFactory.isIndexible();
+    }
+
+    @Override
+    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception {
+        ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
+    }
+
+    @Override
+    public boolean isIndexingOp() {
+        if (inputStreamFactory.isIndexible()) {
+            return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
+        }
+        return false;
+    }
+
+    protected IRecordReader<char[]> configureReader(AbstractStreamRecordReader recordReader, IHyracksTaskContext ctx,
+            int partition) throws Exception {
+        IInputStreamProvider inputStreamProvider = inputStreamFactory.createInputStreamProvider(ctx, partition);
+        IExternalIndexer indexer = null;
+        if (inputStreamFactory.isIndexible()) {
+            if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
+                indexer = ((IIndexingDatasource) inputStreamProvider).getIndexer();
+            }
+        }
+        recordReader.setInputStream(inputStreamProvider.getInputStream());
+        recordReader.setIndexer(indexer);
+        recordReader.configure(configuration);
+        return recordReader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
new file mode 100644
index 0000000..e742b1e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+public class EmptyRecordReader<K, V> implements RecordReader<K, V> {
+
+    @Override
+    public boolean next(K key, V value) throws IOException {
+        return false;
+    }
+
+    @Override
+    public K createKey() {
+        return null;
+    }
+
+    @Override
+    public V createValue() {
+        return null;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
new file mode 100644
index 0000000..d88f967
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Writable>, IIndexingDatasource {
+
+    protected RecordReader<K, Writable> reader;
+    protected V value = null;
+    protected K key = null;
+    protected int currentSplitIndex = 0;
+    protected boolean read[];
+    protected InputFormat<?, ?> inputFormat;
+    protected InputSplit[] inputSplits;
+    protected String[] readSchedule;
+    protected String nodeName;
+    protected JobConf conf;
+    protected GenericRecord<Writable> record;
+    // Indexing variables
+    protected IExternalIndexer indexer;
+    protected List<ExternalFile> snapshot;
+    protected FileSystem hdfs;
+
+    public HDFSRecordReader(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+            JobConf conf) {
+        this.read = read;
+        this.inputSplits = inputSplits;
+        this.readSchedule = readSchedule;
+        this.nodeName = nodeName;
+        this.conf = conf;
+        this.inputFormat = conf.getInputFormat();
+        this.reader = new EmptyRecordReader<K, Writable>();
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        record = new GenericRecord<Writable>();
+        nextInputSplit();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        if (reader.next(key, value)) {
+            return true;
+        }
+        while (nextInputSplit()) {
+            if (reader.next(key, value)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public IRawRecord<Writable> next() throws IOException {
+        record.set(value);
+        return record;
+    }
+
+    @Override
+    public Class<? extends Writable> getRecordClass() throws IOException {
+        if (value == null) {
+            if (!nextInputSplit()) {
+                return null;
+            }
+        }
+        return value.getClass();
+    }
+
+    private boolean nextInputSplit() throws IOException {
+        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+            /**
+             * read all the partitions scheduled to the current node
+             */
+            if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                /**
+                 * pick an unread split to read synchronize among
+                 * simultaneous partitions in the same machine
+                 */
+                synchronized (read) {
+                    if (read[currentSplitIndex] == false) {
+                        read[currentSplitIndex] = true;
+                    } else {
+                        continue;
+                    }
+                }
+                if (snapshot != null) {
+                    String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
+                    FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
+                    // Skip if not the same file stored in the files snapshot
+                    if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
+                            .getTime())
+                        continue;
+                }
+
+                reader.close();
+                reader = getRecordReader(currentSplitIndex);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    private RecordReader<K, Writable> getRecordReader(int splitIndex) throws IOException {
+        reader = (RecordReader<K, Writable>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+        if (key == null) {
+            key = reader.createKey();
+            value = (V) reader.createValue();
+        }
+        if (indexer != null) {
+            try {
+                indexer.reset(this);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        return reader;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public void setIndexer(IExternalIndexer indexer) {
+        this.indexer = indexer;
+    }
+
+    public List<ExternalFile> getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(List<ExternalFile> snapshot) throws IOException {
+        this.snapshot = snapshot;
+        hdfs = FileSystem.get(conf);
+    }
+
+    public int getCurrentSplitIndex() {
+        return currentSplitIndex;
+    }
+
+    public RecordReader<K, Writable> getReader() {
+        return reader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
new file mode 100644
index 0000000..9466a96
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+public class HDFSTextLineReader {
+    private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private FSDataInputStream reader;
+
+    private byte[] buffer;
+    // the number of bytes of real data in the buffer
+    private int bufferLength = 0;
+    // the current position in the buffer
+    private int bufferPosn = 0;
+
+    private long currentFilePos = 0L;
+
+    private static final byte CR = '\r';
+    private static final byte LF = '\n';
+
+    public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
+
+    /**
+     * Create a line reader that reads from the given stream using the
+     * default buffer-size (32k).
+     * 
+     * @param in
+     *            The input stream
+     * @throws IOException
+     */
+    public HDFSTextLineReader(FSDataInputStream in) throws IOException {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Create a line reader that reads from the given stream using the
+     * given buffer-size.
+     * 
+     * @param in
+     *            The input stream
+     * @param bufferSize
+     *            Size of the read buffer
+     * @throws IOException
+     */
+    public HDFSTextLineReader(FSDataInputStream in, int bufferSize) throws IOException {
+        this.reader = in;
+        this.bufferSize = bufferSize;
+        this.buffer = new byte[this.bufferSize];
+        currentFilePos = in.getPos();
+    }
+
+    public HDFSTextLineReader() throws IOException {
+        this.bufferSize = DEFAULT_BUFFER_SIZE;
+        this.buffer = new byte[this.bufferSize];
+    }
+
+    /**
+     * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
+     * 
+     * @param in
+     *            input stream
+     * @param conf
+     *            configuration
+     * @throws IOException
+     */
+    public HDFSTextLineReader(FSDataInputStream in, Configuration conf) throws IOException {
+        this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
+    }
+
+    /**
+     * Read one line from the InputStream into the given Text. A line
+     * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+     * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
+     * line.
+     *
+     * @param str
+     *            the object to store the given line (without newline)
+     * @param maxLineLength
+     *            the maximum number of bytes to store into str;
+     *            the rest of the line is silently discarded.
+     * @param maxBytesToConsume
+     *            the maximum number of bytes to consume
+     *            in this call. This is only a hint, because if the line cross
+     *            this threshold, we allow it to happen. It can overshoot
+     *            potentially by as much as one buffer length.
+     * @return the number of bytes read including the (longest) newline
+     *         found.
+     * @throws IOException
+     *             if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
+        /* We're reading data from in, but the head of the stream may be
+         * already buffered in buffer, so we have several cases:
+         * 1. No newline characters are in the buffer, so we need to copy
+         *    everything and read another buffer from the stream.
+         * 2. An unambiguously terminated line is in buffer, so we just
+         *    copy to str.
+         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+         *    in CR.  In this case we copy everything up to CR to str, but
+         *    we also need to see what follows CR: if it's LF, then we
+         *    need consume LF as well, so next call to readLine will read
+         *    from after that.
+         * We use a flag prevCharCR to signal if previous character was CR
+         * and, if it happens to be at the end of the buffer, delay
+         * consuming it until we have a chance to look at the char that
+         * follows.
+         */
+        str.clear();
+        int txtLength = 0; //tracks str.getLength(), as an optimization
+        int newlineLength = 0; //length of terminating newline
+        boolean prevCharCR = false; //true of prev char was CR
+        long bytesConsumed = 0;
+        do {
+            int startPosn = bufferPosn; //starting from where we left off the last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                if (prevCharCR)
+                    ++bytesConsumed; //account for CR from previous read
+                bufferLength = reader.read(buffer);
+                if (bufferLength <= 0)
+                    break; // EOF
+            }
+            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+                if (buffer[bufferPosn] == LF) {
+                    newlineLength = (prevCharCR) ? 2 : 1;
+                    ++bufferPosn; // at next invocation proceed from following byte
+                    break;
+                }
+                if (prevCharCR) { //CR + notLF, we are at notLF
+                    newlineLength = 1;
+                    break;
+                }
+                prevCharCR = (buffer[bufferPosn] == CR);
+            }
+            int readLength = bufferPosn - startPosn;
+            if (prevCharCR && newlineLength == 0)
+                --readLength; //CR at the end of the buffer
+            bytesConsumed += readLength;
+            int appendLength = readLength - newlineLength;
+            if (appendLength > maxLineLength - txtLength) {
+                appendLength = maxLineLength - txtLength;
+            }
+            if (appendLength > 0) {
+                str.append(buffer, startPosn, appendLength);
+                txtLength += appendLength;
+            }
+        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+        if (bytesConsumed > Integer.MAX_VALUE)
+            throw new IOException("Too many bytes before newline: " + bytesConsumed);
+        currentFilePos = reader.getPos() - bufferLength + bufferPosn;
+        return (int) bytesConsumed;
+    }
+
+    /**
+     * Read from the InputStream into the given Text.
+     * 
+     * @param str
+     *            the object to store the given line
+     * @param maxLineLength
+     *            the maximum number of bytes to store into str.
+     * @return the number of bytes read including the newline
+     * @throws IOException
+     *             if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength) throws IOException {
+        return readLine(str, maxLineLength, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Read from the InputStream into the given Text.
+     * 
+     * @param str
+     *            the object to store the given line
+     * @return the number of bytes read including the newline
+     * @throws IOException
+     *             if the underlying stream throws
+     */
+    public int readLine(Text str) throws IOException {
+        return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+
+    public void seek(long desired) throws IOException {
+        if (reader.getPos() <= desired || currentFilePos > desired) {
+            // desired position is ahead of stream or before the current position, seek to position
+            reader.seek(desired);
+            bufferLength = 0;
+            bufferPosn = 0;
+            currentFilePos = desired;
+        } else if (currentFilePos < desired) {
+            // desired position is in the buffer
+            int difference = (int) (desired - currentFilePos);
+            bufferPosn += difference;
+            currentFilePos = desired;
+        }
+    }
+
+    public FSDataInputStream getReader() {
+        return reader;
+    }
+
+    public void resetReader(FSDataInputStream reader) throws IOException {
+        this.reader = reader;
+        bufferLength = 0;
+        bufferPosn = 0;
+        currentFilePos = reader.getPos();
+    }
+
+    public void close() throws IOException {
+        reader.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
new file mode 100644
index 0000000..9b11df6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class LineRecordReader extends AbstractStreamRecordReader {
+
+    protected boolean prevCharCR;
+    protected int newlineLength;
+    protected int recordNumber = 0;
+
+    @Override
+    public boolean hasNext() throws IOException {
+        /* We're reading data from in, but the head of the stream may be
+         * already buffered in buffer, so we have several cases:
+         * 1. No newline characters are in the buffer, so we need to copy
+         *    everything and read another buffer from the stream.
+         * 2. An unambiguously terminated line is in buffer, so we just
+         *    copy to record.
+         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+         *    in CR.  In this case we copy everything up to CR to record, but
+         *    we also need to see what follows CR: if it's LF, then we
+         *    need consume LF as well, so next call to readLine will read
+         *    from after that.
+         * We use a flag prevCharCR to signal if previous character was CR
+         * and, if it happens to be at the end of the buffer, delay
+         * consuming it until we have a chance to look at the char that
+         * follows.
+         */
+        newlineLength = 0; //length of terminating newline
+        prevCharCR = false; //true of prev char was CR
+        record.reset();
+        int readLength = 0;
+        do {
+            int startPosn = bufferPosn; //starting from where we left off the last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                bufferLength = reader.read(inputBuffer);
+                if (bufferLength <= 0) {
+                    if (readLength > 0) {
+                        record.endRecord();
+                        recordNumber++;
+                        return true;
+                    }
+                    reader.close();
+                    return false; //EOF
+                }
+            }
+            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+                if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                    newlineLength = (prevCharCR) ? 2 : 1;
+                    ++bufferPosn; // at next invocation proceed from following byte
+                    break;
+                }
+                if (prevCharCR) { //CR + notLF, we are at notLF
+                    newlineLength = 1;
+                    break;
+                }
+                prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+            }
+            readLength = bufferPosn - startPosn;
+            if (prevCharCR && newlineLength == 0) {
+                --readLength; //CR at the end of the buffer
+            }
+            if (readLength > 0) {
+                record.append(inputBuffer, startPosn, readLength);
+            }
+        } while (newlineLength == 0);
+        recordNumber++;
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        super.configure(configuration);
+        if (ExternalDataUtils.hasHeader(configuration)) {
+            if (hasNext()) {
+                next();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
new file mode 100644
index 0000000..3a82a68
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.HDFSLookupReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+
+public class LookupReaderFactoryProvider {
+
+    @SuppressWarnings("rawtypes")
+    public static ILookupReaderFactory getLookupReaderFactory(Map<String, String> configuration) throws Exception {
+        String inputFormat = HDFSUtils.getInputFormatClassName(configuration);
+        if (inputFormat.equals(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
+                || inputFormat.equals(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)
+                || inputFormat.equals(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
+            HDFSLookupReaderFactory<Object> readerFactory = new HDFSLookupReaderFactory<Object>();
+            readerFactory.configure(configuration);
+            return readerFactory;
+        } else {
+            throw new AsterixException("Unrecognized external format");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
new file mode 100644
index 0000000..668876e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class QuotedLineRecordReader extends LineRecordReader {
+
+    private char quote;
+    private boolean prevCharEscape;
+    private boolean inQuote;
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        super.configure(configuration);
+        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+        if (quoteString == null || quoteString.length() != 1) {
+            throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
+                    ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
+        }
+        this.quote = quoteString.charAt(0);
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+        newlineLength = 0;
+        prevCharCR = false;
+        prevCharEscape = false;
+        record.reset();
+        int readLength = 0;
+        inQuote = false;
+        do {
+            int startPosn = bufferPosn;
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                bufferLength = reader.read(inputBuffer);
+                if (bufferLength <= 0) {
+                    {
+                        if (readLength > 0) {
+                            if (inQuote) {
+                                throw new IOException("malformed input record ended inside quote");
+                            }
+                            record.endRecord();
+                            recordNumber++;
+                            return true;
+                        }
+                        return false;
+                    }
+                }
+            }
+            for (; bufferPosn < bufferLength; ++bufferPosn) {
+                if (!inQuote) {
+                    if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                        newlineLength = (prevCharCR) ? 2 : 1;
+                        ++bufferPosn;
+                        break;
+                    }
+                    if (prevCharCR) {
+                        newlineLength = 1;
+                        break;
+                    }
+                    prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+                    if (inputBuffer[bufferPosn] == quote) {
+                        if (!prevCharEscape) {
+                            inQuote = true;
+                        }
+                    }
+                    if (prevCharEscape) {
+                        prevCharEscape = false;
+                    } else {
+                        prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                    }
+                } else {
+                    // only look for next quote
+                    if (inputBuffer[bufferPosn] == quote) {
+                        if (!prevCharEscape) {
+                            inQuote = false;
+                        }
+                    }
+                    prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                }
+            }
+            readLength = bufferPosn - startPosn;
+            if (prevCharCR && newlineLength == 0) {
+                --readLength;
+            }
+            if (readLength > 0) {
+                record.append(inputBuffer, startPosn, readLength);
+            }
+        } while (newlineLength == 0);
+        recordNumber++;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
new file mode 100644
index 0000000..5c33502
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+public class RCLookupReader extends AbstractHDFSLookupRecordReader<BytesRefArrayWritable> {
+    public RCLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(RCLookupReader.class.getName());
+    private Reader reader;
+    private LongWritable key = new LongWritable();
+    private BytesRefArrayWritable value = new BytesRefArrayWritable();
+    private GenericRecord<BytesRefArrayWritable> record = new GenericRecord<BytesRefArrayWritable>();
+    private long offset;
+    private int row;
+
+    @Override
+    public Class<?> getRecordClass() throws IOException {
+        return Writable.class;
+    }
+
+    @Override
+    protected IRawRecord<BytesRefArrayWritable> lookup(RecordId rid) throws IOException {
+        if (rid.getOffset() != offset) {
+            offset = rid.getOffset();
+            if (reader.getPosition() != offset)
+                reader.seek(offset);
+            reader.resetBuffer();
+            row = -1;
+        }
+
+        // skip rows to the record row
+        while (row < rid.getRow()) {
+            reader.next(key);
+            reader.getCurrentRow(value);
+            row++;
+        }
+        record.set(value);
+        return record;
+    }
+
+    @Override
+    protected void closeFile() {
+        if (reader == null) {
+            return;
+        }
+        try {
+            reader.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error closing HDFS file", e);
+        }
+    }
+
+    @Override
+    protected void openFile() throws IllegalArgumentException, IOException {
+        reader = new Reader(fs, new Path(file.getFileName()), conf);
+        offset = -1;
+        row = -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
new file mode 100644
index 0000000..1c2dc30
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.log4j.Logger;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.fetcher.FeedFetcher;
+import com.sun.syndication.fetcher.FetcherEvent;
+import com.sun.syndication.fetcher.FetcherException;
+import com.sun.syndication.fetcher.FetcherListener;
+import com.sun.syndication.fetcher.impl.FeedFetcherCache;
+import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
+import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
+import com.sun.syndication.io.FeedException;
+
+public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
+
+    private static final Logger LOGGER = Logger.getLogger(RSSRecordReader.class.getName());
+    private boolean modified = false;
+    private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
+    private FeedFetcherCache feedInfoCache;
+    private FeedFetcher fetcher;
+    private FetcherEventListenerImpl listener;
+    private URL feedUrl;
+    private GenericRecord<SyndEntryImpl> record = new GenericRecord<SyndEntryImpl>();
+    private boolean done = false;
+
+    public RSSRecordReader(String url) throws MalformedURLException {
+        feedUrl = new URL(url);
+        feedInfoCache = HashMapFeedInfoCache.getInstance();
+        fetcher = new HttpURLFeedFetcher(feedInfoCache);
+        listener = new FetcherEventListenerImpl(this, LOGGER);
+        fetcher.addFetcherEventListener(listener);
+    }
+
+    public boolean isModified() {
+        return modified;
+    }
+
+    @Override
+    public void close() throws IOException {
+        fetcher.removeFetcherEventListener(listener);
+    }
+
+    @Override
+    public void configure(Map<String, String> configurations) throws Exception {
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !done;
+    }
+
+    @Override
+    public IRawRecord<SyndEntryImpl> next() throws IOException {
+        if (done) {
+            return null;
+        }
+        try {
+            SyndEntryImpl feedEntry;
+            feedEntry = getNextRSSFeed();
+            if (feedEntry == null) {
+                return null;
+            }
+            record.set(feedEntry);
+            return record;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public Class<SyndEntryImpl> getRecordClass() throws IOException {
+        return SyndEntryImpl.class;
+    }
+
+    @Override
+    public boolean stop() {
+        done = true;
+        return true;
+    }
+
+    public void setModified(boolean modified) {
+        this.modified = modified;
+    }
+
+    private SyndEntryImpl getNextRSSFeed() throws Exception {
+        if (rssFeedBuffer.isEmpty()) {
+            fetchFeed();
+        }
+        if (rssFeedBuffer.isEmpty()) {
+            return null;
+        } else {
+            return rssFeedBuffer.remove();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void fetchFeed() throws IllegalArgumentException, IOException, FeedException, FetcherException {
+        // Retrieve the feed.
+        // We will get a Feed Polled Event and then a
+        // Feed Retrieved event (assuming the feed is valid)
+        SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+        if (modified) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info(feedUrl + " retrieved");
+                LOGGER.info(feedUrl + " has a title: " + feed.getTitle() + " and contains " + feed.getEntries().size()
+                        + " entries.");
+            }
+            List<? extends SyndEntryImpl> fetchedFeeds = feed.getEntries();
+            rssFeedBuffer.addAll(fetchedFeeds);
+        }
+    }
+}
+
+class FetcherEventListenerImpl implements FetcherListener {
+
+    private RSSRecordReader reader;
+    private Logger LOGGER;
+
+    public FetcherEventListenerImpl(RSSRecordReader reader, Logger LOGGER) {
+        this.reader = reader;
+        this.LOGGER = LOGGER;
+    }
+
+    /**
+     * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+     */
+    @Override
+    public void fetcherEvent(FetcherEvent event) {
+        String eventType = event.getEventType();
+        if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("\tEVENT: Feed Polled. URL = " + event.getUrlString());
+            }
+        } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
+            }
+            (reader).setModified(true);
+        } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
+            }
+            (reader).setModified(true);
+        }
+    }
+}



Mime
View raw message