asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [14/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java
deleted file mode 100644
index 7916a16..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-
-/**
- * A class that provides a line reader from an input stream which also allows performing seek operations
- */
-public class HDFSSeekableLineReader {
-    private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-    private int bufferSize = DEFAULT_BUFFER_SIZE;
-    private FSDataInputStream reader;
-
-    private byte[] buffer;
-    // the number of bytes of real data in the buffer
-    private int bufferLength = 0;
-    // the current position in the buffer
-    private int bufferPosn = 0;
-
-    private long currentFilePos = 0L;
-
-    private static final byte CR = '\r';
-    private static final byte LF = '\n';
-
-    public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
-
-    /**
-     * Create a line reader that reads from the given stream using the
-     * default buffer-size (32k).
-     * 
-     * @param in
-     *            The input stream
-     * @throws IOException
-     */
-    public HDFSSeekableLineReader(FSDataInputStream in) throws IOException {
-        this(in, DEFAULT_BUFFER_SIZE);
-    }
-
-    /**
-     * Create a line reader that reads from the given stream using the
-     * given buffer-size.
-     * 
-     * @param in
-     *            The input stream
-     * @param bufferSize
-     *            Size of the read buffer
-     * @throws IOException
-     */
-    public HDFSSeekableLineReader(FSDataInputStream in, int bufferSize) throws IOException {
-        this.reader = in;
-        this.bufferSize = bufferSize;
-        this.buffer = new byte[this.bufferSize];
-        currentFilePos = in.getPos();
-    }
-
-    public HDFSSeekableLineReader() throws IOException {
-        this.bufferSize = DEFAULT_BUFFER_SIZE;
-        this.buffer = new byte[this.bufferSize];
-    }
-
-    /**
-     * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
-     * 
-     * @param in
-     *            input stream
-     * @param conf
-     *            configuration
-     * @throws IOException
-     */
-    public HDFSSeekableLineReader(FSDataInputStream in, Configuration conf) throws IOException {
-        this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
-    }
-
-    /**
-     * Read one line from the InputStream into the given Text. A line
-     * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
-     * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
-     * line.
-     *
-     * @param str
-     *            the object to store the given line (without newline)
-     * @param maxLineLength
-     *            the maximum number of bytes to store into str;
-     *            the rest of the line is silently discarded.
-     * @param maxBytesToConsume
-     *            the maximum number of bytes to consume
-     *            in this call. This is only a hint, because if the line cross
-     *            this threshold, we allow it to happen. It can overshoot
-     *            potentially by as much as one buffer length.
-     * @return the number of bytes read including the (longest) newline
-     *         found.
-     * @throws IOException
-     *             if the underlying stream throws
-     */
-    public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
-        /* We're reading data from in, but the head of the stream may be
-         * already buffered in buffer, so we have several cases:
-         * 1. No newline characters are in the buffer, so we need to copy
-         *    everything and read another buffer from the stream.
-         * 2. An unambiguously terminated line is in buffer, so we just
-         *    copy to str.
-         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-         *    in CR.  In this case we copy everything up to CR to str, but
-         *    we also need to see what follows CR: if it's LF, then we
-         *    need consume LF as well, so next call to readLine will read
-         *    from after that.
-         * We use a flag prevCharCR to signal if previous character was CR
-         * and, if it happens to be at the end of the buffer, delay
-         * consuming it until we have a chance to look at the char that
-         * follows.
-         */
-        str.clear();
-        int txtLength = 0; //tracks str.getLength(), as an optimization
-        int newlineLength = 0; //length of terminating newline
-        boolean prevCharCR = false; //true of prev char was CR
-        long bytesConsumed = 0;
-        do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                if (prevCharCR)
-                    ++bytesConsumed; //account for CR from previous read
-                bufferLength = reader.read(buffer);
-                if (bufferLength <= 0)
-                    break; // EOF
-            }
-            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-                if (buffer[bufferPosn] == LF) {
-                    newlineLength = (prevCharCR) ? 2 : 1;
-                    ++bufferPosn; // at next invocation proceed from following byte
-                    break;
-                }
-                if (prevCharCR) { //CR + notLF, we are at notLF
-                    newlineLength = 1;
-                    break;
-                }
-                prevCharCR = (buffer[bufferPosn] == CR);
-            }
-            int readLength = bufferPosn - startPosn;
-            if (prevCharCR && newlineLength == 0)
-                --readLength; //CR at the end of the buffer
-            bytesConsumed += readLength;
-            int appendLength = readLength - newlineLength;
-            if (appendLength > maxLineLength - txtLength) {
-                appendLength = maxLineLength - txtLength;
-            }
-            if (appendLength > 0) {
-                str.append(buffer, startPosn, appendLength);
-                txtLength += appendLength;
-            }
-        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
-        if (bytesConsumed > (long) Integer.MAX_VALUE)
-            throw new IOException("Too many bytes before newline: " + bytesConsumed);
-        currentFilePos = reader.getPos() - bufferLength + bufferPosn;
-        return (int) bytesConsumed;
-    }
-
-    /**
-     * Read from the InputStream into the given Text.
-     * 
-     * @param str
-     *            the object to store the given line
-     * @param maxLineLength
-     *            the maximum number of bytes to store into str.
-     * @return the number of bytes read including the newline
-     * @throws IOException
-     *             if the underlying stream throws
-     */
-    public int readLine(Text str, int maxLineLength) throws IOException {
-        return readLine(str, maxLineLength, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Read from the InputStream into the given Text.
-     * 
-     * @param str
-     *            the object to store the given line
-     * @return the number of bytes read including the newline
-     * @throws IOException
-     *             if the underlying stream throws
-     */
-    public int readLine(Text str) throws IOException {
-        return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
-    }
-
-    public void seek(long desired) throws IOException {
-        if (reader.getPos() <= desired || currentFilePos > desired) {
-            // desired position is ahead of stream or before the current position, seek to position
-            reader.seek(desired);
-            bufferLength = 0;
-            bufferPosn = 0;
-            currentFilePos = desired;
-        } else if (currentFilePos < desired) {
-            // desired position is in the buffer
-            int difference = (int) (desired - currentFilePos);
-            bufferPosn += difference;
-            currentFilePos = desired;
-        }
-    }
-
-    public FSDataInputStream getReader() {
-        return reader;
-    }
-
-    public void resetReader(FSDataInputStream reader) throws IOException {
-        this.reader = reader;
-        bufferLength = 0;
-        bufferPosn = 0;
-        currentFilePos = reader.getPos();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java
deleted file mode 100644
index d48aaf7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-public interface ILookupReader {
-    public Object read(int fileNumber, long recordOffset) throws Exception;
-    public void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
deleted file mode 100644
index 50853d4..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-//Used in two cases:
-//1. building an index over a dataset
-//2. performing full scan over a dataset that has built index (to provide consistent view) with RCFile format
-
-@SuppressWarnings("rawtypes")
-public class RCFileDataReader extends AbstractHDFSReader {
-
-    private RecordReader reader;
-    private Object key;
-    private Object value;
-    private int currentSplitIndex = 0;
-    private String fileName;
-    private long recordGroupOffset;
-    private long nextRecordGroupOffset;
-    private boolean executed[];
-    private InputSplit[] inputSplits;
-    private String[] readSchedule;
-    private String nodeName;
-    private JobConf conf;
-    private List<ExternalFile> files;
-    private FileSystem hadoopFS;
-
-    public RCFileDataReader(InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
-            boolean executed[], List<ExternalFile> files) throws IOException {
-        this.executed = executed;
-        this.inputSplits = inputSplits;
-        this.readSchedule = readSchedule;
-        this.nodeName = nodeName;
-        this.conf = conf;
-        this.files = files;
-        hadoopFS = FileSystem.get(conf);
-    }
-
-    private boolean moveToNext() throws IOException {
-        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-            /**
-             * read all the partitions scheduled to the current node
-             */
-            if (readSchedule[currentSplitIndex].equals(nodeName)) {
-                /**
-                 * pick an unread split to read synchronize among
-                 * simultaneous partitions in the same machine
-                 */
-                synchronized (executed) {
-                    if (executed[currentSplitIndex] == false) {
-                        executed[currentSplitIndex] = true;
-                    } else {
-                        continue;
-                    }
-                }
-
-                /**
-                 * read the split
-                 */
-                try {
-                    if (files != null) {
-                        fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
-                        FileStatus fileStatus = hadoopFS.getFileStatus(new Path(fileName));
-                        //skip if not the same file stored in the files snapshot
-                        if (fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime()
-                                .getTime())
-                            continue;
-                    }
-                    reader = getRecordReader(currentSplitIndex);
-                    recordGroupOffset = -1;
-                    nextRecordGroupOffset = reader.getPos();
-                } catch (Exception e) {
-                    continue;
-                }
-                key = reader.createKey();
-                value = reader.createValue();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        throw new NotImplementedException("Use readNext()");
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use readNext()");
-    }
-
-    private RecordReader getRecordReader(int slitIndex) throws IOException {
-        RecordReader reader;
-        try {
-            reader = conf.getInputFormat().getRecordReader(inputSplits[slitIndex], conf, getReporter());
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-        return reader;
-    }
-
-    @Override
-    public boolean initialize() throws IOException {
-        return moveToNext();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Object readNext() throws IOException {
-
-        if (reader == null) {
-            return null;
-        }
-        if (reader.next(key, value)) {
-            if (reader.getPos() != nextRecordGroupOffset) {
-                recordGroupOffset = nextRecordGroupOffset;
-                nextRecordGroupOffset = reader.getPos();
-            }
-            return value;
-        }
-        while (moveToNext()) {
-            if (reader.next(key, value)) {
-                if (reader.getPos() != nextRecordGroupOffset) {
-                    recordGroupOffset = nextRecordGroupOffset;
-                    nextRecordGroupOffset = reader.getPos();
-                }
-                return value;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public String getFileName() throws Exception {
-        return files.get(currentSplitIndex).getFileName();
-    }
-
-    @Override
-    public long getReaderPosition() throws Exception {
-        return recordGroupOffset;
-    }
-
-    @Override
-    public int getFileNumber() throws Exception {
-        return files.get(currentSplitIndex).getFileNumber();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
deleted file mode 100644
index f312228..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-public class RCFileLookupReader {
-    private FileSystem fs;
-    private Configuration conf;
-    private int fileNumber = -1;
-    private int rowNumber;
-    private long recordGroupOffset;
-    private Reader reader;
-    boolean skipFile = false;
-    private LongWritable rcKey = new LongWritable();
-    private BytesRefArrayWritable rcValue = new BytesRefArrayWritable();
-    private ExternalFile currentFile = new ExternalFile(null, null, 0, null, null, 0L,
-            ExternalFilePendingOp.PENDING_NO_OP);
-    private ExternalFileIndexAccessor filesIndexAccessor;
-
-    public RCFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf) throws IOException {
-        fs = FileSystem.get(conf);
-        this.conf = conf;
-        this.filesIndexAccessor = filesIndexAccessor;
-    }
-
-    public Writable read(int fileNumber, long recordGroupOffset, int rowNumber) throws Exception {
-        if (fileNumber != this.fileNumber) {
-            filesIndexAccessor.searchForFile(fileNumber, currentFile);
-            try {
-                FileStatus fileStatus = fs.getFileStatus(new Path(currentFile.getFileName()));
-                if (fileStatus.getModificationTime() != currentFile.getLastModefiedTime().getTime()) {
-                    this.fileNumber = fileNumber;
-                    skipFile = true;
-                    return null;
-                } else {
-                    this.fileNumber = fileNumber;
-                    skipFile = false;
-                }
-            } catch (FileNotFoundException e) {
-                // Couldn't find file, skip it
-                this.fileNumber = fileNumber;
-                skipFile = true;
-                return null;
-            }
-            // Close old file and open new one
-            if (reader != null)
-                reader.close();
-            reader = new Reader(fs, new Path(currentFile.getFileName()), conf);
-            this.recordGroupOffset = -1;
-            this.rowNumber = -1;
-        } else if (skipFile) {
-            return null;
-        }
-        // Seek to the record group if needed
-        if (recordGroupOffset != this.recordGroupOffset) {
-            this.recordGroupOffset = recordGroupOffset;
-            if (reader.getPosition() != recordGroupOffset)
-                reader.seek(recordGroupOffset);
-            reader.resetBuffer();
-            this.rowNumber = -1;
-        }
-
-        // skip rows to the record row
-        while (this.rowNumber < rowNumber) {
-            reader.next(rcKey);
-            reader.getCurrentRow(rcValue);
-            this.rowNumber++;
-        }
-        return rcValue;
-    }
-
-    public void close() throws Exception {
-        if (reader != null)
-            reader.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
deleted file mode 100644
index e787921..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-@SuppressWarnings("deprecation")
-public class SequenceFileLookupInputStream extends AbstractHDFSLookupInputStream {
-
-    private SequenceFile.Reader reader;
-    private Writable seqKey;
-    private Text seqValue = new Text();
-    private Configuration conf;
-
-    public SequenceFileLookupInputStream(ExternalFileIndexAccessor fileIndexAccessor, JobConf conf) throws IOException {
-        super(fileIndexAccessor, conf);
-        this.conf = conf;
-    }
-
-    @Override
-    protected void openFile(String fileName) throws IOException {
-        if (reader != null) {
-            reader.close();
-        }
-        reader = new SequenceFile.Reader(fs, new Path(fileName), conf);
-        seqKey = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (reader != null) {
-            reader.close();
-        }
-        super.close();
-    }
-
-    @Override
-    protected boolean read(long recordOffset) {
-        try {
-            reader.seek(recordOffset);
-            reader.next(seqKey, seqValue);
-            pendingValue = seqValue.toString();
-            return true;
-        } catch (IOException e) {
-            // Same Question: seek and read failed afer openning file succeede, should we do something about it?
-            e.printStackTrace();
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
deleted file mode 100644
index 76b3660..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class SequenceFileLookupReader implements ILookupReader {
-
-    private Reader reader;
-    private Writable key;
-    private Writable value;
-    private FileSystem fs;
-    private int fileNumber = -1;
-    private boolean skipFile = false;
-    private ExternalFile file = new ExternalFile(null, null, 0, null, null, 0L, ExternalFilePendingOp.PENDING_NO_OP);
-    private ExternalFileIndexAccessor filesIndexAccessor;
-    private Configuration conf;
-
-    public SequenceFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf)
-            throws IOException {
-        fs = FileSystem.get(conf);
-        this.filesIndexAccessor = filesIndexAccessor;
-        this.conf = conf;
-    }
-
-    @Override
-    public Writable read(int fileNumber, long recordOffset) throws Exception {
-        if (fileNumber != this.fileNumber) {
-            //get file name
-            this.fileNumber = fileNumber;
-            filesIndexAccessor.searchForFile(fileNumber, file);
-            try {
-                FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
-                if (fileStatus.getModificationTime() != file.getLastModefiedTime().getTime()) {
-                    this.fileNumber = fileNumber;
-                    skipFile = true;
-                    return null;
-                } else {
-                    this.fileNumber = fileNumber;
-                    skipFile = false;
-                    openFile(file.getFileName());
-                }
-            } catch (FileNotFoundException e) {
-                // file was not found, do nothing and skip its tuples
-                this.fileNumber = fileNumber;
-                skipFile = true;
-                return null;
-            }
-        } else if (skipFile) {
-            return null;
-        }
-        reader.seek(recordOffset);
-        reader.next(key, value);
-        return value;
-    }
-
-    @SuppressWarnings("deprecation")
-    private void openFile(String FileName) throws IOException {
-        if (reader != null)
-            try {
-                reader.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        reader = new SequenceFile.Reader(fs, new Path(FileName), conf);
-        key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-        value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-    }
-
-    @Override
-    public void close() {
-        if (reader != null)
-            try {
-                reader.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
deleted file mode 100644
index ea82c18..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-
-public class TextFileLookupInputStream extends AbstractHDFSLookupInputStream {
-
-    private HDFSSeekableLineReader lineReader = new HDFSSeekableLineReader();
-    private Text value = new Text();
-
-    public TextFileLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf) throws IOException {
-        super(filesIndexAccessor, conf);
-    }
-
-    @Override
-    public void openFile(String FileName) throws IOException {
-        if (lineReader.getReader() != null) {
-            lineReader.getReader().close();
-        }
-        lineReader.resetReader(fs.open(new Path(FileName)));
-    }
-
-    @Override
-    public void close() {
-        if (lineReader.getReader() != null) {
-            try {
-                lineReader.getReader().close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @Override
-    protected boolean read(long recordOffset) {
-        try {
-            lineReader.seek(recordOffset);
-            lineReader.readLine(value);
-            pendingValue = value.toString();
-            return true;
-        } catch (IOException e) {
-            // file was opened and then when trying to seek and read, an error occurred <- should we throw an exception ???->
-            e.printStackTrace();
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
deleted file mode 100644
index 5864df2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-
-public class TextFileLookupReader implements ILookupReader {
-    private FileSystem fs;
-    private int fileNumber = -1;
-    private boolean skipFile = false;
-    private ExternalFile file = new ExternalFile(null, null, 0, null, null, 0L, ExternalFilePendingOp.PENDING_NO_OP);
-    private ExternalFileIndexAccessor filesIndexAccessor;
-    private HDFSSeekableLineReader lineReader;
-    private Text value = new Text();
-
-    public TextFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf) throws IOException {
-        this.fs = FileSystem.get(conf);
-        this.filesIndexAccessor = filesIndexAccessor;
-        this.lineReader = new HDFSSeekableLineReader();
-    }
-
-    @Override
-    public String read(int fileNumber, long recordOffset) throws Exception {
-        if (fileNumber != this.fileNumber) {
-            this.fileNumber = fileNumber;
-            filesIndexAccessor.searchForFile(fileNumber, file);
-
-            try {
-                FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
-                if (fileStatus.getModificationTime() != file.getLastModefiedTime().getTime()) {
-                    this.fileNumber = fileNumber;
-                    skipFile = true;
-                    return null;
-                } else {
-                    this.fileNumber = fileNumber;
-                    skipFile = false;
-                    openFile(file.getFileName());
-                }
-            } catch (FileNotFoundException e) {
-                // File is not there, skip it and do nothing
-                this.fileNumber = fileNumber;
-                skipFile = true;
-                return null;
-            }
-        } else if (skipFile) {
-            return null;
-        }
-        lineReader.seek(recordOffset);
-        lineReader.readLine(value);
-        return value.toString();
-    }
-
-    private void openFile(String FileName) throws IOException {
-        if (lineReader.getReader() != null) {
-            lineReader.getReader().close();
-        }
-        lineReader.resetReader(fs.open(new Path(FileName)));
-    }
-
-    @Override
-    public void close() {
-        if (lineReader.getReader() != null) {
-            try {
-                lineReader.getReader().close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
deleted file mode 100644
index 5e4f013..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-// Used in two cases:
-// 1. building an index over a dataset
-// 2. performing full scan over a dataset that has built index (to provide consistent view)
-
-@SuppressWarnings("rawtypes")
-public class TextualDataReader extends AbstractHDFSReader {
-
-    private RecordReader<Object, Text> reader;
-    private Object key;
-    private Text value;
-    private boolean hasMore = false;
-    private int EOL = "\n".getBytes()[0];
-    private Text pendingValue = null;
-    private int currentSplitIndex = 0;
-    private String fileName;
-    private long recordOffset;
-    private boolean executed[];
-    private InputSplit[] inputSplits;
-    private String[] readSchedule;
-    private String nodeName;
-    private JobConf conf;
-    private List<ExternalFile> files;
-    private FileSystem hadoopFS;
-
-    public TextualDataReader(InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
-            boolean executed[], List<ExternalFile> files) throws IOException {
-        this.executed = executed;
-        this.inputSplits = inputSplits;
-        this.readSchedule = readSchedule;
-        this.nodeName = nodeName;
-        this.conf = conf;
-        this.files = files;
-        hadoopFS = FileSystem.get(conf);
-    }
-
-    @Override
-    public boolean initialize() throws Exception {
-        return moveToNext();
-    }
-
-    @Override
-    public Object readNext() throws Exception {
-        if (reader == null) {
-            return null;
-        }
-        recordOffset = reader.getPos();
-        if (reader.next(key, value)) {
-            return value;
-        }
-        while (moveToNext()) {
-            recordOffset = reader.getPos();
-            if (reader.next(key, value)) {
-                return value;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public int getFileNumber() throws Exception {
-        return files.get(currentSplitIndex).getFileNumber();
-    }
-
-    @Override
-    public String getFileName() throws Exception {
-        return files.get(currentSplitIndex).getFileName();
-    }
-
-    @Override
-    public long getReaderPosition() throws Exception {
-        return recordOffset;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use read(byte[], int, int");
-    }
-
-    @SuppressWarnings("unchecked")
-    private boolean moveToNext() throws IOException {
-        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-            /**
-             * read all the partitions scheduled to the current node
-             */
-            if (readSchedule[currentSplitIndex].equals(nodeName)) {
-                /**
-                 * pick an unread split to read synchronize among
-                 * simultaneous partitions in the same machine
-                 */
-                synchronized (executed) {
-                    if (executed[currentSplitIndex] == false) {
-                        executed[currentSplitIndex] = true;
-                    } else {
-                        continue;
-                    }
-                }
-
-                /**
-                 * read the split
-                 */
-                try {
-                    if (files != null) {
-                        fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
-                        FileStatus fileStatus = hadoopFS.getFileStatus(new Path(fileName));
-                        // Skip if not the same file stored in the files snapshot
-                        if (fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime()
-                                .getTime())
-                            continue;
-                    }
-                    // It is the same file
-                    reader = getRecordReader(currentSplitIndex);
-                } catch (Exception e) {
-                    // ignore exceptions <-- This might change later -->
-                    continue;
-                }
-                key = reader.createKey();
-                value = reader.createValue();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private RecordReader getRecordReader(int splitIndex) throws IOException {
-        RecordReader reader;
-        if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-            SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-            reader = format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
-        } else {
-            TextInputFormat format = (TextInputFormat) conf.getInputFormat();
-            reader = format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
-        }
-        return reader;
-    }
-
-    // Return one record at a time <to preserve the indexing information>
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        if (reader == null) {
-            if (!moveToNext()) {
-                // nothing to read
-                return -1;
-            }
-        }
-
-        int numBytes = 0;
-        if (pendingValue != null) {
-            int sizeOfNextTuple = pendingValue.getLength() + 1;
-            if (sizeOfNextTuple > len) {
-                return 0;
-            }
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
-            pendingValue = null;
-            return numBytes;
-        }
-        if (numBytes < len) {
-            //store the byte location
-            recordOffset = reader.getPos();
-            hasMore = reader.next(key, value);
-            if (!hasMore) {
-                while (moveToNext()) {
-                    //store the byte location
-                    recordOffset = reader.getPos();
-                    hasMore = reader.next(key, value);
-                    if (hasMore) {
-                        //return the value read
-                        int sizeOfNextTuple = value.getLength() + 1;
-                        if (numBytes + sizeOfNextTuple > len) {
-                            pendingValue = value;
-                            return 0;
-                        } else {
-                            System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                            buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                            numBytes += sizeOfNextTuple;
-                            return numBytes;
-                        }
-                    }
-                }
-                return -1;
-            } else {
-                //return the value read
-                int sizeOfNextTuple = value.getLength() + 1;
-                if (numBytes + sizeOfNextTuple > len) {
-                    pendingValue = value;
-                    return 0;
-                } else {
-                    System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                    buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                    numBytes += sizeOfNextTuple;
-                    return numBytes;
-                }
-            }
-        }
-        return numBytes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
deleted file mode 100644
index 9fe09a2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.Counters.Counter;
-
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-public class TextualFullScanDataReader extends InputStream {
-
-    private RecordReader<Object, Text> reader;
-    private Object key;
-    private Text value;
-    private boolean hasMore = false;
-    private int EOL = "\n".getBytes()[0];
-    private Text pendingValue = null;
-    private int currentSplitIndex = 0;
-    private boolean executed[];
-    private InputSplit[] inputSplits;
-    private String[] readSchedule;
-    private String nodeName;
-    private JobConf conf;
-
-    public TextualFullScanDataReader(boolean executed[], InputSplit[] inputSplits, String[] readSchedule,
-            String nodeName, JobConf conf) {
-        this.executed = executed;
-        this.inputSplits = inputSplits;
-        this.readSchedule = readSchedule;
-        this.nodeName = nodeName;
-        this.conf = conf;
-    }
-
-    @Override
-    public int available() {
-        return 1;
-    }
-
-    @SuppressWarnings("unchecked")
-    private boolean moveToNext() throws IOException {
-        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-            /**
-             * read all the partitions scheduled to the current node
-             */
-            if (readSchedule[currentSplitIndex].equals(nodeName)) {
-                /**
-                 * pick an unread split to read synchronize among
-                 * simultaneous partitions in the same machine
-                 */
-                synchronized (executed) {
-                    if (executed[currentSplitIndex] == false) {
-                        executed[currentSplitIndex] = true;
-                    } else {
-                        continue;
-                    }
-                }
-
-                /**
-                 * read the split
-                 */
-                reader = getRecordReader(currentSplitIndex);
-                key = reader.createKey();
-                value = (Text) reader.createValue();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        if (reader == null) {
-            if (!moveToNext()) {
-                // nothing to read
-                return -1;
-            }
-        }
-
-        int numBytes = 0;
-        if (pendingValue != null) {
-            int sizeOfNextTuple = pendingValue.getLength() + 1;
-            if (sizeOfNextTuple > len) {
-                return 0;
-            }
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
-            pendingValue = null;
-        }
-
-        while (numBytes < len) {
-            hasMore = reader.next(key, value);
-            if (!hasMore) {
-                while (moveToNext()) {
-                    hasMore = reader.next(key, value);
-                    if (hasMore) {
-                        // move to the next non-empty split
-                        break;
-                    }
-                }
-            }
-            if (!hasMore) {
-                return (numBytes == 0) ? -1 : numBytes;
-            }
-            int sizeOfNextTuple = value.getLength() + 1;
-            if (numBytes + sizeOfNextTuple > len) {
-                // cannot add tuple to current buffer
-                // but the reader has moved pass the fetched tuple
-                // we need to store this for a subsequent read call.
-                // and return this then.
-                pendingValue = value;
-                break;
-            } else {
-                System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                numBytes += sizeOfNextTuple;
-            }
-        }
-        return numBytes;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use read(byte[], int, int");
-    }
-
-    @SuppressWarnings("rawtypes")
-    private RecordReader getRecordReader(int splitIndex) throws IOException {
-        if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-            SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-            RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
-                    conf, getReporter());
-            return reader;
-        } else {
-            TextInputFormat format = (TextInputFormat) conf.getInputFormat();
-            RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
-                    conf, getReporter());
-            return reader;
-        }
-    }
-
-    private Reporter getReporter() {
-        Reporter reporter = new Reporter() {
-
-            @Override
-            public Counter getCounter(Enum<?> arg0) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String arg0, String arg1) {
-                return null;
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public void incrCounter(Enum<?> arg0, long arg1) {
-            }
-
-            @Override
-            public void incrCounter(String arg0, String arg1, long arg2) {
-            }
-
-            @Override
-            public void setStatus(String arg0) {
-            }
-
-            @Override
-            public void progress() {
-            }
-
-            @Override
-            public float getProgress() {
-                return 0.0f;
-            }
-        };
-
-        return reporter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
deleted file mode 100644
index 89abf0f..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.util.List;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-// This is an operator that takes a single file index and an array of secondary indexes
-// it is intended to be used for 
-// 1. commit transaction operation
-// 2. abort transaction operation
-// 3. recover transaction operation
-public abstract class AbstractExternalDatasetIndexesOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-    private ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory;
-    private IndexInfoOperatorDescriptor fileIndexInfo;
-    private List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories;
-    private List<IndexInfoOperatorDescriptor> bTreeIndexesInfos;
-    private List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories;
-    private List<IndexInfoOperatorDescriptor> rTreeIndexesInfos;
-
-    public AbstractExternalDatasetIndexesOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
-            IndexInfoOperatorDescriptor fileIndexesInfo,
-            List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
-            List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
-        super(spec, 0, 0);
-        this.filesIndexDataflowHelperFactory = filesIndexDataflowHelperFactory;
-        this.fileIndexInfo = fileIndexesInfo;
-        this.bTreeIndexesDataflowHelperFactories = bTreeIndexesDataflowHelperFactories;
-        this.bTreeIndexesInfos = bTreeIndexesInfos;
-        this.rTreeIndexesDataflowHelperFactories = rTreeIndexesDataflowHelperFactories;
-        this.rTreeIndexesInfos = rTreeIndexesInfos;
-    }
-
-    // opening and closing the index is done inside these methods since we don't always need open indexes
-    protected abstract void performOpOnIndex(
-            IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
-            IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception;
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-        return new AbstractOperatorNodePushable() {
-
-            @Override
-            public void initialize() throws HyracksDataException {
-                try {
-                    // only in partition of device id = 0, we perform the operation on the files index
-                    if(fileIndexInfo.getFileSplitProvider().getFileSplits()[partition].getIODeviceId() == 0){
-                        performOpOnIndex(filesIndexDataflowHelperFactory, ctx, fileIndexInfo, partition);
-                    }
-                    // perform operation on btrees
-                    for (int i = 0; i < bTreeIndexesDataflowHelperFactories.size(); i++) {
-                        performOpOnIndex(bTreeIndexesDataflowHelperFactories.get(i), ctx,
-                                bTreeIndexesInfos.get(i), partition);
-                    }
-                    // perform operation on rtrees
-                    for (int i = 0; i < rTreeIndexesDataflowHelperFactories.size(); i++) {
-                        performOpOnIndex(rTreeIndexesDataflowHelperFactories.get(i), ctx,
-                                rTreeIndexesInfos.get(i), partition);
-                    }
-                } catch (Exception e) {
-                    // This should never happen <unless there is a hardware failure or something serious>
-                    throw new HyracksDataException(e);
-                }
-            }
-
-            @Override
-            public void deinitialize() throws HyracksDataException {
-            }
-
-            @Override
-            public int getInputArity() {
-                return 0;
-            }
-
-            @Override
-            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
-                    throws HyracksDataException {
-            }
-
-            @Override
-            public IFrameWriter getInputFrameWriter(int index) {
-                return null;
-            }
-
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
deleted file mode 100644
index 6ff991b..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbortRecoverLSMIndexFileManager;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    public ExternalDatasetIndexesAbortOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
-            IndexInfoOperatorDescriptor fileIndexesInfo,
-            List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
-            List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
-        super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, bTreeIndexesDataflowHelperFactories,
-                bTreeIndexesInfos, rTreeIndexesDataflowHelperFactories, rTreeIndexesInfos);
-    }
-
-    @Override
-    protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
-            IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
-        AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
-        fileManager.deleteTransactionFiles();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
deleted file mode 100644
index e89a8db..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor {
-
-    public ExternalDatasetIndexesCommitOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
-            IndexInfoOperatorDescriptor fileIndexesInfo,
-            List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
-            List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
-        super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, bTreeIndexesDataflowHelperFactories,
-                bTreeIndexesInfos, rTreeIndexesDataflowHelperFactories, rTreeIndexesInfos);
-    }
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
-            IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
-        System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
-        // Get DataflowHelper
-        IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
-        // Get index
-        IIndex index = indexHelper.getIndexInstance();
-        // commit transaction
-        ((ITwoPCIndex) index).commitTransaction();
-        System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
deleted file mode 100644
index 9bdfaa6..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbortRecoverLSMIndexFileManager;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    public ExternalDatasetIndexesRecoverOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
-            IndexInfoOperatorDescriptor fileIndexesInfo,
-            List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
-            List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
-            List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
-        super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, bTreeIndexesDataflowHelperFactories,
-                bTreeIndexesInfos, rTreeIndexesDataflowHelperFactories, rTreeIndexesInfos);
-    }
-
-    @Override
-    protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
-            IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
-        AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
-        fileManager.recoverTransaction();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java
deleted file mode 100644
index f56b3ae..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
-
-public class ExternalIndexBulkModifyOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-    private final int[] deletedFiles;
-    private final int[] fieldPermutation;
-    private final float fillFactor;
-    private final long numElementsHint;
-
-    public ExternalIndexBulkModifyOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
-            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
-            IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
-            IIndexDataflowHelperFactory dataflowHelperFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackFactory, int[] deletedFiles,
-            int[] fieldPermutation, float fillFactor, long numElementsHint) {
-        super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false, false, null,
-                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
-                modificationOpCallbackFactory);
-        this.deletedFiles = deletedFiles;
-        this.fieldPermutation = fieldPermutation;
-        this.fillFactor = fillFactor;
-        this.numElementsHint = numElementsHint;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new ExternalIndexBulkModifyOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
-                numElementsHint, recordDescProvider, deletedFiles);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
deleted file mode 100644
index a9c9ac7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
-import org.apache.hyracks.storage.am.common.api.IndexException;
-import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
-import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-
-public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
-
-    private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
-    private final int[] deletedFiles;
-    private ArrayTupleBuilder buddyBTreeTupleBuilder = new ArrayTupleBuilder(
-            filesIndexDescription.FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFieldCount());
-    private AMutableInt32 fileNumber = new AMutableInt32(0);
-    private ArrayTupleReference deleteTuple = new ArrayTupleReference();
-
-    public ExternalIndexBulkModifyOperatorNodePushable(ExternalIndexBulkModifyOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, long numElementsHint,
-            IRecordDescriptorProvider recordDescProvider, int[] deletedFiles) {
-        super(opDesc, ctx, partition, fieldPermutation, fillFactor, false, numElementsHint, false, recordDescProvider);
-        this.deletedFiles = deletedFiles;
-    }
-
-    // We override this method to do two things
-    // when creating the bulkLoader, it creates a transaction bulk loader
-    // It uses the bulkLoader to insert delete tuples for the deleted files
-    @Override
-    public void open() throws HyracksDataException {
-        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(recDesc);
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
-        try {
-            writer.open();
-            // Transactional BulkLoader
-            bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
-                    checkIfEmptyIndex);
-            // Delete files
-            for (int i = 0; i < deletedFiles.length; i++) {
-                fileNumber.setValue(deletedFiles[i]);
-                filesIndexDescription.getBuddyBTreeTupleFromFileNumber(deleteTuple, buddyBTreeTupleBuilder, fileNumber);
-                ((ITwoPCIndexBulkLoader) bulkLoader).delete(deleteTuple);
-            }
-        } catch (Throwable e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor.reset(buffer);
-        int tupleCount = accessor.getTupleCount();
-        for (int i = 0; i < tupleCount; i++) {
-            tuple.reset(accessor, i);
-            try {
-                bulkLoader.add(tuple);
-            } catch (IndexException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (index != null) {
-            try {
-                bulkLoader.end();
-            } catch (Throwable th) {
-                throw new HyracksDataException(th);
-            } finally {
-                try {
-                    indexHelper.close();
-                } finally {
-                    writer.close();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        if (index != null) {
-            try {
-                ((ITwoPCIndexBulkLoader) bulkLoader).abort();
-            } finally {
-                writer.fail();
-            }
-        }
-    }
-}


Mime
View raw message