asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/5] incubator-asterixdb git commit: Add flush() to IFrameWriter
Date Fri, 29 Jan 2016 23:52:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
new file mode 100644
index 0000000..28abddb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractHDFSLookupRecordReader<T> implements ILookupRecordReader<T> {
+
+    protected int fileId;
+    private ExternalFileIndexAccessor snapshotAccessor;
+    protected ExternalFile file;
+    protected FileSystem fs;
+    protected Configuration conf;
+    protected boolean replaced;
+
+    public AbstractHDFSLookupRecordReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+            Configuration conf) {
+        this.snapshotAccessor = snapshotAccessor;
+        this.fs = fs;
+        this.conf = conf;
+        this.fileId = -1;
+        this.file = new ExternalFile();
+    }
+
+    @Override
+    public void configure(Map<String, String> configurations) throws Exception {
+    }
+
+    @Override
+    public IRawRecord<T> read(RecordId rid) throws Exception {
+        if (rid.getFileId() != fileId) {
+            // close current file
+            closeFile();
+            // lookup new file
+            snapshotAccessor.lookup(rid.getFileId(), file);
+            fileId = rid.getFileId();
+            try {
+                validate();
+                if (!replaced) {
+                    openFile();
+                    validate();
+                    if (replaced) {
+                        closeFile();
+                    }
+                }
+            } catch (FileNotFoundException e) {
+                replaced = true;
+            }
+        }
+        if (replaced) {
+            return null;
+        }
+        return lookup(rid);
+    }
+
+    protected abstract IRawRecord<T> lookup(RecordId rid) throws IOException;
+
+    private void validate() throws IllegalArgumentException, IOException {
+        FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
+        replaced = fileStatus.getModificationTime() != file.getLastModefiedTime().getTime();
+    }
+
+    protected abstract void closeFile();
+
+    protected abstract void openFile() throws IllegalArgumentException, IOException;
+
+    @Override
+    public final void open() throws HyracksDataException {
+        snapshotAccessor.open();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            closeFile();
+        } finally {
+            snapshotAccessor.close();
+        }
+    }
+
+    @Override
+    public void fail() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
new file mode 100644
index 0000000..c302b9b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
+
+    protected static final long serialVersionUID = 1L;
+    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected ConfFactory confFactory;
+    protected Map<String, String> configuration;
+
+    public HDFSLookupReaderFactory() {
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+        return clusterLocations;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+        confFactory = new ConfFactory(conf);
+
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
+            ExternalFileIndexAccessor snapshotAccessor) throws Exception {
+        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+        JobConf conf = confFactory.getConf();
+        FileSystem fs = FileSystem.get(conf);
+        switch (inputFormatParameter) {
+            case ExternalDataConstants.INPUT_FORMAT_TEXT:
+                return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
+            case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+                return (ILookupRecordReader<? extends T>) new SequenceLookupReader(snapshotAccessor, fs, conf);
+            case ExternalDataConstants.INPUT_FORMAT_RC:
+                return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
+            default:
+                throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
new file mode 100644
index 0000000..564d55a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.reader.EmptyRecordReader;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Writable>, IIndexingDatasource {
+
+    protected RecordReader<K, Writable> reader;
+    protected V value = null;
+    protected K key = null;
+    protected int currentSplitIndex = 0;
+    protected boolean read[];
+    protected InputFormat<?, ?> inputFormat;
+    protected InputSplit[] inputSplits;
+    protected String[] readSchedule;
+    protected String nodeName;
+    protected JobConf conf;
+    protected GenericRecord<Writable> record;
+    // Indexing variables
+    protected IExternalIndexer indexer;
+    protected List<ExternalFile> snapshot;
+    protected FileSystem hdfs;
+
+    public HDFSRecordReader(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+            JobConf conf) {
+        this.read = read;
+        this.inputSplits = inputSplits;
+        this.readSchedule = readSchedule;
+        this.nodeName = nodeName;
+        this.conf = conf;
+        this.inputFormat = conf.getInputFormat();
+        this.reader = new EmptyRecordReader<K, Writable>();
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        record = new GenericRecord<Writable>();
+        nextInputSplit();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        if (reader.next(key, value)) {
+            return true;
+        }
+        while (nextInputSplit()) {
+            if (reader.next(key, value)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public IRawRecord<Writable> next() throws IOException {
+        record.set(value);
+        return record;
+    }
+
+    private boolean nextInputSplit() throws IOException {
+        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+            /**
+             * read all the partitions scheduled to the current node
+             */
+            if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                /**
+                 * pick an unread split to read synchronize among
+                 * simultaneous partitions in the same machine
+                 */
+                synchronized (read) {
+                    if (read[currentSplitIndex] == false) {
+                        read[currentSplitIndex] = true;
+                    } else {
+                        continue;
+                    }
+                }
+                if (snapshot != null) {
+                    String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
+                    FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
+                    // Skip if not the same file stored in the files snapshot
+                    if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
+                            .getTime())
+                        continue;
+                }
+
+                reader.close();
+                reader = getRecordReader(currentSplitIndex);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    private RecordReader<K, Writable> getRecordReader(int splitIndex) throws IOException {
+        reader = (RecordReader<K, Writable>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+        if (key == null) {
+            key = reader.createKey();
+            value = (V) reader.createValue();
+        }
+        if (indexer != null) {
+            try {
+                indexer.reset(this);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        return reader;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public void setIndexer(IExternalIndexer indexer) {
+        this.indexer = indexer;
+    }
+
+    public List<ExternalFile> getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(List<ExternalFile> snapshot) throws IOException {
+        this.snapshot = snapshot;
+        hdfs = FileSystem.get(conf);
+    }
+
+    public int getCurrentSplitIndex() {
+        return currentSplitIndex;
+    }
+
+    public RecordReader<K, Writable> getReader() {
+        return reader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
new file mode 100644
index 0000000..90f1558
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+public class HDFSTextLineReader {
+    private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private FSDataInputStream reader;
+
+    private byte[] buffer;
+    // the number of bytes of real data in the buffer
+    private int bufferLength = 0;
+    // the current position in the buffer
+    private int bufferPosn = 0;
+
+    private long currentFilePos = 0L;
+
+    private static final byte CR = '\r';
+    private static final byte LF = '\n';
+
+    public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
+
+    /**
+     * Create a line reader that reads from the given stream using the
+     * default buffer-size (32k).
+     *
+     * @param in
+     *            The input stream
+     * @throws IOException
+     */
+    public HDFSTextLineReader(FSDataInputStream in) throws IOException {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Create a line reader that reads from the given stream using the
+     * given buffer-size.
+     *
+     * @param in
+     *            The input stream
+     * @param bufferSize
+     *            Size of the read buffer
+     * @throws IOException
+     */
+    public HDFSTextLineReader(FSDataInputStream in, int bufferSize) throws IOException {
+        this.reader = in;
+        this.bufferSize = bufferSize;
+        this.buffer = new byte[this.bufferSize];
+        currentFilePos = in.getPos();
+    }
+
+    public HDFSTextLineReader() throws IOException {
+        this.bufferSize = DEFAULT_BUFFER_SIZE;
+        this.buffer = new byte[this.bufferSize];
+    }
+
+    /**
+     * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
+     *
+     * @param in
+     *            input stream
+     * @param conf
+     *            configuration
+     * @throws IOException
+     */
+    public HDFSTextLineReader(FSDataInputStream in, Configuration conf) throws IOException {
+        this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
+    }
+
+    /**
+     * Read one line from the InputStream into the given Text. A line
+     * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
+     * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
+     * line.
+     *
+     * @param str
+     *            the object to store the given line (without newline)
+     * @param maxLineLength
+     *            the maximum number of bytes to store into str;
+     *            the rest of the line is silently discarded.
+     * @param maxBytesToConsume
+     *            the maximum number of bytes to consume
+     *            in this call. This is only a hint, because if the line cross
+     *            this threshold, we allow it to happen. It can overshoot
+     *            potentially by as much as one buffer length.
+     * @return the number of bytes read including the (longest) newline
+     *         found.
+     * @throws IOException
+     *             if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
+        /* We're reading data from in, but the head of the stream may be
+         * already buffered in buffer, so we have several cases:
+         * 1. No newline characters are in the buffer, so we need to copy
+         *    everything and read another buffer from the stream.
+         * 2. An unambiguously terminated line is in buffer, so we just
+         *    copy to str.
+         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+         *    in CR.  In this case we copy everything up to CR to str, but
+         *    we also need to see what follows CR: if it's LF, then we
+         *    need consume LF as well, so next call to readLine will read
+         *    from after that.
+         * We use a flag prevCharCR to signal if previous character was CR
+         * and, if it happens to be at the end of the buffer, delay
+         * consuming it until we have a chance to look at the char that
+         * follows.
+         */
+        str.clear();
+        int txtLength = 0; //tracks str.getLength(), as an optimization
+        int newlineLength = 0; //length of terminating newline
+        boolean prevCharCR = false; //true of prev char was CR
+        long bytesConsumed = 0;
+        do {
+            int startPosn = bufferPosn; //starting from where we left off the last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                if (prevCharCR)
+                    ++bytesConsumed; //account for CR from previous read
+                bufferLength = reader.read(buffer);
+                if (bufferLength <= 0)
+                    break; // EOF
+            }
+            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+                if (buffer[bufferPosn] == LF) {
+                    newlineLength = (prevCharCR) ? 2 : 1;
+                    ++bufferPosn; // at next invocation proceed from following byte
+                    break;
+                }
+                if (prevCharCR) { //CR + notLF, we are at notLF
+                    newlineLength = 1;
+                    break;
+                }
+                prevCharCR = (buffer[bufferPosn] == CR);
+            }
+            int readLength = bufferPosn - startPosn;
+            if (prevCharCR && newlineLength == 0)
+                --readLength; //CR at the end of the buffer
+            bytesConsumed += readLength;
+            int appendLength = readLength - newlineLength;
+            if (appendLength > maxLineLength - txtLength) {
+                appendLength = maxLineLength - txtLength;
+            }
+            if (appendLength > 0) {
+                str.append(buffer, startPosn, appendLength);
+                txtLength += appendLength;
+            }
+        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+        if (bytesConsumed > Integer.MAX_VALUE)
+            throw new IOException("Too many bytes before newline: " + bytesConsumed);
+        currentFilePos = reader.getPos() - bufferLength + bufferPosn;
+        return (int) bytesConsumed;
+    }
+
+    /**
+     * Read from the InputStream into the given Text.
+     *
+     * @param str
+     *            the object to store the given line
+     * @param maxLineLength
+     *            the maximum number of bytes to store into str.
+     * @return the number of bytes read including the newline
+     * @throws IOException
+     *             if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength) throws IOException {
+        return readLine(str, maxLineLength, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Read from the InputStream into the given Text.
+     *
+     * @param str
+     *            the object to store the given line
+     * @return the number of bytes read including the newline
+     * @throws IOException
+     *             if the underlying stream throws
+     */
+    public int readLine(Text str) throws IOException {
+        return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+
+    public void seek(long desired) throws IOException {
+        if (reader.getPos() <= desired || currentFilePos > desired) {
+            // desired position is ahead of stream or before the current position, seek to position
+            reader.seek(desired);
+            bufferLength = 0;
+            bufferPosn = 0;
+            currentFilePos = desired;
+        } else if (currentFilePos < desired) {
+            // desired position is in the buffer
+            int difference = (int) (desired - currentFilePos);
+            bufferPosn += difference;
+            currentFilePos = desired;
+        }
+    }
+
+    public FSDataInputStream getReader() {
+        return reader;
+    }
+
+    public void resetReader(FSDataInputStream reader) throws IOException {
+        this.reader = reader;
+        bufferLength = 0;
+        bufferPosn = 0;
+        currentFilePos = reader.getPos();
+    }
+
+    public void close() throws IOException {
+        reader.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
new file mode 100644
index 0000000..95d76ba
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+public class RCLookupReader extends AbstractHDFSLookupRecordReader<BytesRefArrayWritable> {
+    public RCLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(RCLookupReader.class.getName());
+    private Reader reader;
+    private LongWritable key = new LongWritable();
+    private BytesRefArrayWritable value = new BytesRefArrayWritable();
+    private GenericRecord<BytesRefArrayWritable> record = new GenericRecord<BytesRefArrayWritable>();
+    private long offset;
+    private int row;
+
+    @Override
+    public Class<?> getRecordClass() throws IOException {
+        return Writable.class;
+    }
+
+    @Override
+    protected IRawRecord<BytesRefArrayWritable> lookup(RecordId rid) throws IOException {
+        if (rid.getOffset() != offset) {
+            offset = rid.getOffset();
+            if (reader.getPosition() != offset)
+                reader.seek(offset);
+            reader.resetBuffer();
+            row = -1;
+        }
+
+        // skip rows to the record row
+        while (row < rid.getRow()) {
+            reader.next(key);
+            reader.getCurrentRow(value);
+            row++;
+        }
+        record.set(value);
+        return record;
+    }
+
+    @Override
+    protected void closeFile() {
+        if (reader == null) {
+            return;
+        }
+        try {
+            reader.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error closing HDFS file", e);
+        }
+    }
+
+    @Override
+    protected void openFile() throws IllegalArgumentException, IOException {
+        reader = new Reader(fs, new Path(file.getFileName()), conf);
+        offset = -1;
+        row = -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
new file mode 100644
index 0000000..23e647f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+public class SequenceLookupReader extends AbstractCharRecordLookupReader {
+
+    public SequenceLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(SequenceLookupReader.class.getName());
+    private Reader reader;
+    private Writable key;
+
+    @Override
+    protected void readRecord(RecordId rid) throws IOException {
+        reader.seek(rid.getOffset());
+        reader.next(key, value);
+    }
+
+    @Override
+    protected void closeFile() {
+        if (reader == null) {
+            return;
+        }
+        try {
+            reader.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error closing HDFS file ", e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void openFile() throws IllegalArgumentException, IOException {
+        reader = new SequenceFile.Reader(fs, new Path(file.getFileName()), conf);
+        key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
new file mode 100644
index 0000000..2e1a11a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class TextLookupReader extends AbstractCharRecordLookupReader {
+
+    public TextLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(TextLookupReader.class.getName());
+    private HDFSTextLineReader reader;
+
+    @Override
+    protected void readRecord(RecordId rid) throws IOException {
+        reader.seek(rid.getOffset());
+        reader.readLine(value);
+    }
+
+    @Override
+    protected void closeFile() {
+        if (reader == null) {
+            return;
+        }
+        try {
+            reader.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error closing HDFS file ", e);
+        }
+    }
+
+    @Override
+    protected void openFile() throws IllegalArgumentException, IOException {
+        if (reader == null) {
+            reader = new HDFSTextLineReader();
+        }
+        reader.resetReader(fs.open(new Path(file.getFileName())));;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
new file mode 100644
index 0000000..13cd26a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.rss;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.log4j.Logger;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.fetcher.FeedFetcher;
+import com.sun.syndication.fetcher.FetcherEvent;
+import com.sun.syndication.fetcher.FetcherException;
+import com.sun.syndication.fetcher.FetcherListener;
+import com.sun.syndication.fetcher.impl.FeedFetcherCache;
+import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
+import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
+import com.sun.syndication.io.FeedException;
+
+public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
+
+    private static final Logger LOGGER = Logger.getLogger(RSSRecordReader.class.getName());
+    private boolean modified = false;
+    private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
+    private FeedFetcherCache feedInfoCache;
+    private FeedFetcher fetcher;
+    private FetcherEventListenerImpl listener;
+    private URL feedUrl;
+    private GenericRecord<SyndEntryImpl> record = new GenericRecord<SyndEntryImpl>();
+    private boolean done = false;
+
+    public RSSRecordReader(String url) throws MalformedURLException {
+        feedUrl = new URL(url);
+        feedInfoCache = HashMapFeedInfoCache.getInstance();
+        fetcher = new HttpURLFeedFetcher(feedInfoCache);
+        listener = new FetcherEventListenerImpl(this, LOGGER);
+        fetcher.addFetcherEventListener(listener);
+    }
+
+    public boolean isModified() {
+        return modified;
+    }
+
+    @Override
+    public void close() throws IOException {
+        fetcher.removeFetcherEventListener(listener);
+    }
+
+    @Override
+    public void configure(Map<String, String> configurations) throws Exception {
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !done;
+    }
+
+    @Override
+    public IRawRecord<SyndEntryImpl> next() throws IOException {
+        if (done) {
+            return null;
+        }
+        try {
+            SyndEntryImpl feedEntry;
+            feedEntry = getNextRSSFeed();
+            if (feedEntry == null) {
+                return null;
+            }
+            record.set(feedEntry);
+            return record;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public boolean stop() {
+        done = true;
+        return true;
+    }
+
+    public void setModified(boolean modified) {
+        this.modified = modified;
+    }
+
+    private SyndEntryImpl getNextRSSFeed() throws Exception {
+        if (rssFeedBuffer.isEmpty()) {
+            fetchFeed();
+        }
+        if (rssFeedBuffer.isEmpty()) {
+            return null;
+        } else {
+            return rssFeedBuffer.remove();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void fetchFeed() throws IllegalArgumentException, IOException, FeedException, FetcherException {
+        // Retrieve the feed.
+        // We will get a Feed Polled Event and then a
+        // Feed Retrieved event (assuming the feed is valid)
+        SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+        if (modified) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info(feedUrl + " retrieved");
+                LOGGER.info(feedUrl + " has a title: " + feed.getTitle() + " and contains " + feed.getEntries().size()
+                        + " entries.");
+            }
+            List<? extends SyndEntryImpl> fetchedFeeds = feed.getEntries();
+            rssFeedBuffer.addAll(fetchedFeeds);
+        }
+    }
+}
+
+class FetcherEventListenerImpl implements FetcherListener {
+
+    private RSSRecordReader reader;
+    private Logger LOGGER;
+
+    public FetcherEventListenerImpl(RSSRecordReader reader, Logger LOGGER) {
+        this.reader = reader;
+        this.LOGGER = LOGGER;
+    }
+
+    /**
+     * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+     */
+    @Override
+    public void fetcherEvent(FetcherEvent event) {
+        String eventType = event.getEventType();
+        if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("\tEVENT: Feed Polled. URL = " + event.getUrlString());
+            }
+        } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
+            }
+            (reader).setModified(true);
+        } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
+            }
+            (reader).setModified(true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
new file mode 100644
index 0000000..bbe485c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.rss;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, String> configuration;
+    private List<String> urls = new ArrayList<String>();
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(urls.size());
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
+        if (url == null) {
+            throw new IllegalArgumentException("no RSS URL provided");
+        }
+        initializeURLs(url);
+    }
+
+    private void initializeURLs(String url) {
+        urls.clear();
+        String[] rssURLs = url.split(",");
+        for (String rssURL : rssURLs) {
+            urls.add(rssURL);
+        }
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws Exception {
+        RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
+        reader.configure(configuration);
+        return reader;
+    }
+
+    @Override
+    public Class<? extends SyndEntryImpl> getRecordClass() {
+        return SyndEntryImpl.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
new file mode 100644
index 0000000..6225b82
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.AInputStreamReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
+    protected AInputStreamReader reader;
+    protected CharArrayRecord record;
+    protected char[] inputBuffer;
+    protected int bufferLength = 0;
+    protected int bufferPosn = 0;
+    protected IExternalIndexer indexer;
+    protected boolean done = false;
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException {
+        return record;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!done) {
+            reader.close();
+        }
+        done = true;
+    }
+
+    public void setInputStream(AInputStream inputStream) throws IOException {
+        this.reader = new AInputStreamReader(inputStream);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        record = new CharArrayRecord();
+        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public void setIndexer(IExternalIndexer indexer) {
+        this.indexer = indexer;
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            reader.stop();
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    @Override
+    public void setController(IDataFlowController controller) {
+        reader.setController((AbstractFeedDataFlowController) controller);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
new file mode 100644
index 0000000..f02bd93
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractStreamRecordReaderFactory<T>
+        implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
+
+    private static final long serialVersionUID = 1L;
+    protected IInputStreamProviderFactory inputStreamFactory;
+    protected Map<String, String> configuration;
+
+    public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
+            IInputStreamProviderFactory inputStreamFactory) {
+        this.inputStreamFactory = inputStreamFactory;
+        return this;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return inputStreamFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        inputStreamFactory.configure(configuration);
+        configureStreamReaderFactory(configuration);
+    }
+
+    protected abstract void configureStreamReaderFactory(Map<String, String> configuration) throws Exception;
+
+    @Override
+    public boolean isIndexible() {
+        return inputStreamFactory.isIndexible();
+    }
+
+    @Override
+    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception {
+        ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
+    }
+
+    @Override
+    public boolean isIndexingOp() {
+        if (inputStreamFactory.isIndexible()) {
+            return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
+        }
+        return false;
+    }
+
+    protected IRecordReader<char[]> configureReader(AbstractStreamRecordReader recordReader, IHyracksTaskContext ctx,
+            int partition) throws Exception {
+        IInputStreamProvider inputStreamProvider = inputStreamFactory.createInputStreamProvider(ctx, partition);
+        IExternalIndexer indexer = null;
+        if (inputStreamFactory.isIndexible()) {
+            if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
+                indexer = ((IIndexingDatasource) inputStreamProvider).getIndexer();
+            }
+        }
+        recordReader.setInputStream(inputStreamProvider.getInputStream());
+        recordReader.setIndexer(indexer);
+        recordReader.configure(configuration);
+        return recordReader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
new file mode 100644
index 0000000..f8f7cda
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class LineRecordReader extends AbstractStreamRecordReader {
+
+    protected boolean prevCharCR;
+    protected int newlineLength;
+    protected int recordNumber = 0;
+
+    @Override
+    public boolean hasNext() throws IOException {
+        if (done) {
+            return false;
+        }
+        /* We're reading data from in, but the head of the stream may be
+         * already buffered in buffer, so we have several cases:
+         * 1. No newline characters are in the buffer, so we need to copy
+         *    everything and read another buffer from the stream.
+         * 2. An unambiguously terminated line is in buffer, so we just
+         *    copy to record.
+         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+         *    in CR.  In this case we copy everything up to CR to record, but
+         *    we also need to see what follows CR: if it's LF, then we
+         *    need consume LF as well, so next call to readLine will read
+         *    from after that.
+         * We use a flag prevCharCR to signal if previous character was CR
+         * and, if it happens to be at the end of the buffer, delay
+         * consuming it until we have a chance to look at the char that
+         * follows.
+         */
+        newlineLength = 0; //length of terminating newline
+        prevCharCR = false; //true of prev char was CR
+        record.reset();
+        int readLength = 0;
+        do {
+            int startPosn = bufferPosn; //starting from where we left off the last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                bufferLength = reader.read(inputBuffer);
+                if (bufferLength <= 0) {
+                    if (readLength > 0) {
+                        record.endRecord();
+                        recordNumber++;
+                        return true;
+                    }
+                    close();
+                    return false; //EOF
+                }
+            }
+            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+                if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                    newlineLength = (prevCharCR) ? 2 : 1;
+                    ++bufferPosn; // at next invocation proceed from following byte
+                    break;
+                }
+                if (prevCharCR) { //CR + notLF, we are at notLF
+                    newlineLength = 1;
+                    break;
+                }
+                prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+            }
+            readLength = bufferPosn - startPosn;
+            if (prevCharCR && newlineLength == 0) {
+                --readLength; //CR at the end of the buffer
+            }
+            if (readLength > 0) {
+                record.append(inputBuffer, startPosn, readLength);
+            }
+        } while (newlineLength == 0);
+        recordNumber++;
+        return true;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        super.configure(configuration);
+        if (ExternalDataUtils.hasHeader(configuration)) {
+            if (hasNext()) {
+                next();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
new file mode 100644
index 0000000..f0867d3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+        LineRecordReader recordReader;
+        if (quoteString != null) {
+            recordReader = new QuotedLineRecordReader();
+        } else {
+            recordReader = new LineRecordReader();
+        }
+        return configureReader(recordReader, ctx, partition);
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
new file mode 100644
index 0000000..a8eb07b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class QuotedLineRecordReader extends LineRecordReader {
+
+    private char quote;
+    private boolean prevCharEscape;
+    private boolean inQuote;
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        super.configure(configuration);
+        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+        if (quoteString == null || quoteString.length() != 1) {
+            throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
+                    ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
+        }
+        this.quote = quoteString.charAt(0);
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+        if (done) {
+            return false;
+        }
+        newlineLength = 0;
+        prevCharCR = false;
+        prevCharEscape = false;
+        record.reset();
+        int readLength = 0;
+        inQuote = false;
+        do {
+            int startPosn = bufferPosn;
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                bufferLength = reader.read(inputBuffer);
+                if (bufferLength <= 0) {
+                    {
+                        if (readLength > 0) {
+                            if (inQuote) {
+                                throw new IOException("malformed input record ended inside quote");
+                            }
+                            record.endRecord();
+                            recordNumber++;
+                            return true;
+                        }
+                        close();
+                        return false;
+                    }
+                }
+            }
+            for (; bufferPosn < bufferLength; ++bufferPosn) {
+                if (!inQuote) {
+                    if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                        newlineLength = (prevCharCR) ? 2 : 1;
+                        ++bufferPosn;
+                        break;
+                    }
+                    if (prevCharCR) {
+                        newlineLength = 1;
+                        break;
+                    }
+                    prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+                    if (inputBuffer[bufferPosn] == quote) {
+                        if (!prevCharEscape) {
+                            inQuote = true;
+                        }
+                    }
+                    if (prevCharEscape) {
+                        prevCharEscape = false;
+                    } else {
+                        prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                    }
+                } else {
+                    // only look for next quote
+                    if (inputBuffer[bufferPosn] == quote) {
+                        if (!prevCharEscape) {
+                            inQuote = false;
+                        }
+                    }
+                    prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                }
+            }
+            readLength = bufferPosn - startPosn;
+            if (prevCharCR && newlineLength == 0) {
+                --readLength;
+            }
+            if (readLength > 0) {
+                record.append(inputBuffer, startPosn, readLength);
+            }
+        } while (newlineLength == 0);
+        recordNumber++;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
new file mode 100644
index 0000000..d469cb3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+
+    private int depth;
+    private boolean prevCharEscape;
+    private boolean inString;
+    private char recordStart;
+    private char recordEnd;
+    private int recordNumber = 0;
+
+    public int getRecordNumber() {
+        return recordNumber;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        super.configure(configuration);
+        String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
+        String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
+        if (recStartString != null) {
+            if (recStartString.length() != 1) {
+                throw new AsterixException(
+                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
+            }
+            recordStart = recStartString.charAt(0);
+        } else {
+            recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
+        }
+        if (recEndString != null) {
+            if (recEndString.length() != 1) {
+                throw new AsterixException(
+                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
+            }
+            recordEnd = recEndString.charAt(0);
+        } else {
+            recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        if (done) {
+            return false;
+        }
+        record.reset();
+        boolean hasStarted = false;
+        boolean hasFinished = false;
+        prevCharEscape = false;
+        inString = false;
+        depth = 0;
+        do {
+            int startPosn = bufferPosn; //starting from where we left off the last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                bufferLength = reader.read(inputBuffer);
+                if (bufferLength <= 0) {
+                    close();
+                    return false; // EOF
+                }
+            }
+            if (!hasStarted) {
+                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                    if (inputBuffer[bufferPosn] == recordStart) {
+                        startPosn = bufferPosn;
+                        hasStarted = true;
+                        depth = 1;
+                        ++bufferPosn; // at next invocation proceed from following byte
+                        break;
+                    } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
+                            && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
+                            && inputBuffer[bufferPosn] != ExternalDataConstants.LF
+                            && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
+                        // corrupted file. clear the buffer and stop reading
+                        reader.skipError();
+                        bufferPosn = bufferLength = 0;
+                        throw new IOException("Malformed input stream");
+                    }
+                }
+            }
+            if (hasStarted) {
+                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                    if (inString) {
+                        // we are in a string, we only care about the string end
+                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+                            inString = false;
+                        }
+                        if (prevCharEscape) {
+                            prevCharEscape = false;
+                        } else {
+                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                        }
+                    } else {
+                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+                            inString = true;
+                        } else if (inputBuffer[bufferPosn] == recordStart) {
+                            depth += 1;
+                        } else if (inputBuffer[bufferPosn] == recordEnd) {
+                            depth -= 1;
+                            if (depth == 0) {
+                                hasFinished = true;
+                                ++bufferPosn; // at next invocation proceed from following byte
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+
+            int appendLength = bufferPosn - startPosn;
+            if (appendLength > 0) {
+                record.append(inputBuffer, startPosn, appendLength);
+            }
+        } while (!hasFinished);
+        record.endRecord();
+        recordNumber++;
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            reader.stop();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
new file mode 100644
index 0000000..ec8eac9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+        SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+        return configureReader(recordReader, ctx, partition);
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
new file mode 100644
index 0000000..084d6d0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+public class TwitterPullRecordReader implements IRecordReader<Status> {
+
+    private String keywords;
+    private Query query;
+    private Twitter twitter;
+    private int requestInterval = 5; // seconds
+    private QueryResult result;
+    private int nextTweetIndex = 0;
+    private long lastTweetIdReceived = 0;
+    private GenericRecord<Status> record;
+
+    public TwitterPullRecordReader() {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        twitter = TwitterUtil.getTwitterService(configuration);
+        keywords = configuration.get(SearchAPIConstants.QUERY);
+        requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
+        query = new Query(keywords);
+        query.setCount(100);
+        record = new GenericRecord<Status>();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return true;
+    }
+
+    @Override
+    public IRawRecord<Status> next() throws IOException, InterruptedException {
+        if (result == null || nextTweetIndex >= result.getTweets().size()) {
+            Thread.sleep(1000 * requestInterval);
+            query.setSinceId(lastTweetIdReceived);
+            try {
+                result = twitter.search(query);
+            } catch (TwitterException e) {
+                throw new HyracksDataException(e);
+            }
+            nextTweetIndex = 0;
+        }
+        if (result != null && !result.getTweets().isEmpty()) {
+            List<Status> tw = result.getTweets();
+            Status tweet = tw.get(nextTweetIndex++);
+            if (lastTweetIdReceived < tweet.getId()) {
+                lastTweetIdReceived = tweet.getId();
+            }
+            record.set(tweet);
+            return record;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
new file mode 100644
index 0000000..764ac1d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+
+public class TwitterPushRecordReader implements IRecordReader<Status> {
+    private LinkedBlockingQueue<Status> inputQ;
+    private TwitterStream twitterStream;
+    private GenericRecord<Status> record;
+    private boolean closed = false;
+
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            twitterStream.clearListeners();
+            twitterStream.cleanUp();
+            twitterStream = null;
+            closed = true;
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        record = new GenericRecord<Status>();
+        inputQ = new LinkedBlockingQueue<Status>();
+        twitterStream = TwitterUtil.getTwitterStream(configuration);
+        twitterStream.addListener(new TweetListener(inputQ));
+        FilterQuery query = TwitterUtil.getFilterQuery(configuration);
+        if (query != null) {
+            twitterStream.filter(query);
+        } else {
+            twitterStream.sample();
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !closed;
+    }
+
+    @Override
+    public IRawRecord<Status> next() throws IOException, InterruptedException {
+        Status tweet = inputQ.poll();
+        if (tweet == null) {
+            return null;
+        }
+        record.set(tweet);
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            close();
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    private class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<Status> inputQ;
+
+        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            inputQ.add(tweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+        }
+    }
+}


Mime
View raw message