asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [18/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java
new file mode 100755
index 0000000..9f14ec0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IResultCollector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.IAObject;
+
+public interface IResultCollector {
+
+    public void writeIntResult(int result) throws AsterixException;
+
+    public void writeFloatResult(float result) throws AsterixException;
+
+    public void writeDoubleResult(double result) throws AsterixException;
+
+    public void writeStringResult(String result) throws AsterixException;
+
+    public void writeRecordResult(ARecord result) throws AsterixException;
+
+    public void writeListResult(AOrderedList list) throws AsterixException;
+
+    public IAObject getComplexTypeResultHolder();
+
+    public DataOutput getDataOutput();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
new file mode 100644
index 0000000..31d6317
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.InputStream;
+
+public interface IStreamDataParser extends IDataParser {
+    /**
+     * Sets the inputStream for the parser. called only for parsers that support InputStreams
+     */
+    public void setInputStream(InputStream in) throws Exception;
+
+    /**
+     * Parse data into output AsterixDataModel binary records.
+     * Used with parsers that support stream sources
+     *
+     * @param out
+     *            DataOutput instance that for writing the parser output.
+     */
+
+    public boolean parse(DataOutput out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
new file mode 100644
index 0000000..828f71e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IStreamDataParserFactory extends IDataParserFactory {
+
+    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException, AsterixException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
new file mode 100644
index 0000000..d368c48
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+public interface IStreamFlowController extends IDataFlowController {
+    public void setStreamParser(IStreamDataParser dataParser);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
new file mode 100644
index 0000000..d06161e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractDataFlowController implements IDataFlowController {
+
+    protected ITupleForwarder tupleForwarder;
+    protected IHyracksTaskContext ctx;
+    protected Map<String, String> configuration;
+
+    @Override
+    public ITupleForwarder getTupleForwarder() {
+        return tupleForwarder;
+    }
+
+    @Override
+    public void setTupleForwarder(ITupleForwarder tupleForwarder) {
+        this.tupleForwarder = tupleForwarder;
+    }
+
+    protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
+        tupleForwarder.initialize(ctx, writer);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws IOException {
+        this.configuration = configuration;
+        this.ctx = ctx;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
new file mode 100644
index 0000000..116ec09
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class CounterTimerTupleForwarder implements ITupleForwarder {
+
+    public static final String BATCH_SIZE = "batch-size";
+    public static final String BATCH_INTERVAL = "batch-interval";
+
+    private static final Logger LOGGER = Logger.getLogger(CounterTimerTupleForwarder.class.getName());
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+    private int batchSize;
+    private long batchInterval;
+    private int tuplesInFrame = 0;
+    private TimeBasedFlushTask flushTask;
+    private Timer timer;
+    private Object lock = new Object();
+    private boolean activeTimer = false;
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        String propValue = configuration.get(BATCH_SIZE);
+        if (propValue != null) {
+            batchSize = Integer.parseInt(propValue);
+        } else {
+            batchSize = -1;
+        }
+
+        propValue = configuration.get(BATCH_INTERVAL);
+        if (propValue != null) {
+            batchInterval = Long.parseLong(propValue);
+            activeTimer = true;
+        }
+    }
+
+    @Override
+    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        appender.reset(frame, true);
+        this.writer = writer;
+        if (activeTimer) {
+            this.timer = new Timer();
+            this.flushTask = new TimeBasedFlushTask(writer, lock);
+            timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
+        }
+    }
+
+    @Override
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (activeTimer) {
+            synchronized (lock) {
+                addTupleToFrame(tb);
+            }
+        } else {
+            addTupleToFrame(tb);
+        }
+        tuplesInFrame++;
+    }
+
+    private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (tuplesInFrame == batchSize
+                || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
+            }
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            tuplesInFrame = 0;
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            if (activeTimer) {
+                synchronized (lock) {
+                    FrameUtils.flushFrame(frame.getBuffer(), writer);
+                }
+            } else {
+                FrameUtils.flushFrame(frame.getBuffer(), writer);
+            }
+        }
+
+        if (timer != null) {
+            timer.cancel();
+        }
+    }
+
+    private class TimeBasedFlushTask extends TimerTask {
+
+        private IFrameWriter writer;
+        private final Object lock;
+
+        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
+            this.writer = writer;
+            this.lock = lock;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (tuplesInFrame > 0) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
+                    }
+                    synchronized (lock) {
+                        FrameUtils.flushFrame(frame.getBuffer(), writer);
+                        appender.reset(frame, true);
+                        tuplesInFrame = 0;
+                    }
+                }
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
new file mode 100644
index 0000000..36d41b4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameFullTupleForwarder implements ITupleForwarder {
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        // no-op
+    }
+
+    @Override
+    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        appender.reset(frame, true);
+    }
+
+    @Override
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            appender.reset(frame, true);
+            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+            if (!success) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
new file mode 100644
index 0000000..68c6f9b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class IndexingDataFlowController<T> extends RecordDataFlowController<T> {
+    IExternalIndexer indexer;
+
+    @Override
+    protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws Exception {
+        indexer.index(tb);
+    }
+
+    @Override
+    public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
+        super.setRecordReader(recordReader);
+        indexer = ((IIndexingDatasource) recordReader).getIndexer();
+        numOfTupleFields += indexer.getNumberOfFields();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
new file mode 100644
index 0000000..99cc3d1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class RateControlledTupleForwarder implements ITupleForwarder {
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+    private long interTupleInterval;
+    private boolean delayConfigured;
+
+    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        if (propValue != null) {
+            interTupleInterval = Long.parseLong(propValue);
+        }
+        delayConfigured = interTupleInterval != 0;
+    }
+
+    @Override
+    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        appender.reset(frame, true);
+    }
+
+    @Override
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (delayConfigured) {
+            try {
+                Thread.sleep(interTupleInterval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            appender.reset(frame, true);
+            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+            if (!success) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
new file mode 100644
index 0000000..ad8e791
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordFlowController;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class RecordDataFlowController<T> extends AbstractDataFlowController implements IRecordFlowController<T> {
+
+    protected IRecordDataParser<T> dataParser;
+    protected IRecordReader<? extends T> recordReader;
+    protected int numOfTupleFields = 1;
+
+    @Override
+    public void start(IFrameWriter writer) throws HyracksDataException {
+        try {
+            ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
+            initializeTupleForwarder(writer);
+            while (recordReader.hasNext()) {
+                IRawRecord<? extends T> record = recordReader.next();
+                tb.reset();
+                dataParser.parse(record, tb.getDataOutput());
+                tb.addFieldEndOffset();
+                appendOtherTupleFields(tb);
+                tupleForwarder.addTuple(tb);
+            }
+            tupleForwarder.close();
+            recordReader.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws Exception {
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void setRecordParser(IRecordDataParser<T> dataParser) {
+        this.dataParser = dataParser;
+    }
+
+    @Override
+    public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
+        this.recordReader = recordReader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
new file mode 100644
index 0000000..3016470
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class StreamDataFlowController extends AbstractDataFlowController implements IStreamFlowController {
+    private IStreamDataParser dataParser;
+    private static final int NUMBER_OF_TUPLE_FIELDS = 1;
+
+    @Override
+    public void start(IFrameWriter writer) throws HyracksDataException {
+        try {
+            ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
+            initializeTupleForwarder(writer);
+            while (true) {
+                tb.reset();
+                if (!dataParser.parse(tb.getDataOutput())) {
+                    break;
+                }
+                tb.addFieldEndOffset();
+                tupleForwarder.addTuple(tb);
+            }
+            tupleForwarder.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void setStreamParser(IStreamDataParser dataParser) {
+        this.dataParser = dataParser;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java
deleted file mode 100644
index ede820d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetEntity.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
-
-public class AzureTweetEntity extends TableServiceEntity {
-
-    private String postingType;
-    private String json;
-
-    public AzureTweetEntity() {
-    }
-
-    public AzureTweetEntity(String userID, String postingID) {
-        this.partitionKey = userID;
-        this.rowKey = postingID;
-    }
-
-    public String getPostingType() {
-        return postingType;
-    }
-
-    public void setPostingType(String postingType) {
-        this.postingType = postingType;
-    }
-
-    public void setJSON(String json) {
-        this.json = json;
-    }
-
-    public String getJSON() {
-        return json;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java
deleted file mode 100644
index ddda897..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AzureTweetMetadataEntity.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
-
-public class AzureTweetMetadataEntity extends TableServiceEntity {
-    private String creationTimestamp;
-    private String postingType;
-    private String productId;
-    private String ethnicity;
-    private String gender;
-    private String sentiment;
-    private String location;
-
-    public AzureTweetMetadataEntity() {
-    }
-
-    public AzureTweetMetadataEntity(String partitionKey, String rowKey) {
-        this.partitionKey = partitionKey;
-        this.rowKey = rowKey;
-    }
-
-    public String getCreationTimestamp() {
-        return creationTimestamp;
-    }
-
-    public void setCreationTimestamp(String creationTimestamp) {
-        this.creationTimestamp = creationTimestamp;
-    }
-
-    public String getPostingType() {
-        return postingType;
-    }
-
-    public void setPostingType(String postingType) {
-        this.postingType = postingType;
-    }
-
-    public String getProductId() {
-        return productId;
-    }
-
-    public void setProductId(String productId) {
-        this.productId = productId;
-    }
-
-    public String getEthnicity() {
-        return ethnicity;
-    }
-
-    public void setEthnicity(String ethnicity) {
-        this.ethnicity = ethnicity;
-    }
-
-    public String getGender() {
-        return gender;
-    }
-
-    public void setGender(String gender) {
-        this.gender = gender;
-    }
-
-    public String getSentiment() {
-        return sentiment;
-    }
-
-    public void setSentiment(String sentiment) {
-        this.sentiment = sentiment;
-    }
-
-    public String getLocation() {
-        return location;
-    }
-
-    public void setLocation(String location) {
-        this.location = location;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
deleted file mode 100644
index a197368..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.external.dataset.adapter.IFeedClient.InflowState;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Acts as an abstract class for all pull-based external data adapters. Captures
- * the common logic for obtaining bytes from an external source and packing them
- * into frames as tuples.
- */
-public abstract class ClientBasedFeedAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(ClientBasedFeedAdapter.class.getName());
-    private static final int timeout = 5; // seconds
-
-    protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
-    protected IFeedClient pullBasedFeedClient;
-    protected ARecordType adapterOutputType;
-    protected boolean continueIngestion = true;
-    protected Map<String, String> configuration;
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private long tupleCount = 0;
-    private final IHyracksTaskContext ctx;
-    private int frameTupleCount = 0;
-
-    protected FeedPolicyEnforcer policyEnforcer;
-
-    public FeedPolicyEnforcer getPolicyEnforcer() {
-        return policyEnforcer;
-    }
-
-    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
-        this.policyEnforcer = policyEnforcer;
-    }
-
-    public abstract IFeedClient getFeedClient(int partition) throws Exception;
-
-    public abstract ITupleForwardPolicy getTupleParserPolicy();
-
-    public ClientBasedFeedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
-        this.ctx = ctx;
-        this.configuration = configuration;
-    }
-
-    public long getIngestedRecordsCount() {
-        return tupleCount;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        appender = new FrameTupleAppender();
-        frame = new VSizeFrame(ctx);
-        appender.reset(frame, true);
-        ITupleForwardPolicy policy = getTupleParserPolicy();
-        policy.configure(configuration);
-        pullBasedFeedClient = getFeedClient(partition);
-        InflowState inflowState = null;
-        policy.initialize(ctx, writer);
-        while (continueIngestion) {
-            tupleBuilder.reset();
-            try {
-                inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
-                switch (inflowState) {
-                    case DATA_AVAILABLE:
-                        tupleBuilder.addFieldEndOffset();
-                        policy.addTuple(tupleBuilder);
-                        frameTupleCount++;
-                        break;
-                    case NO_MORE_DATA:
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Reached end of feed");
-                        }
-                        policy.close();
-                        tupleCount += frameTupleCount;
-                        frameTupleCount = 0;
-                        continueIngestion = false;
-                        break;
-                    case DATA_NOT_AVAILABLE:
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Timed out on obtaining data from pull based adapter. Trying again!");
-                        }
-                        break;
-                }
-
-            } catch (Exception failureException) {
-                try {
-                    failureException.printStackTrace();
-                    boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
-                    if (continueIngestion) {
-                        tupleBuilder.reset();
-                        continue;
-                    } else {
-                        throw failureException;
-                    }
-                } catch (Exception recoveryException) {
-                    throw new Exception(recoveryException);
-                }
-            }
-        }
-    }
-
-    /**
-     * Discontinue the ingestion of data and end the feed.
-     * 
-     * @throws Exception
-     */
-    @Override
-    public void stop() throws Exception {
-        continueIngestion = false;
-    }
-
-    public Map<String, String> getConfiguration() {
-        return configuration;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
deleted file mode 100644
index e321b67..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AMutableDateTime;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableOrderedList;
-import org.apache.asterix.om.base.AMutablePoint;
-import org.apache.asterix.om.base.AMutableRecord;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AMutableUnorderedList;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.IACursor;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.AUnorderedListType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public abstract class FeedClient implements IFeedClient {
-
-    protected static final Logger LOGGER = Logger.getLogger(FeedClient.class.getName());
-
-    protected ARecordSerializerDeserializer recordSerDe;
-    protected AMutableRecord mutableRecord;
-    protected boolean messageReceived;
-    protected boolean continueIngestion = true;
-    protected IARecordBuilder recordBuilder = new RecordBuilder();
-
-    protected AMutableString aString = new AMutableString("");
-    protected AMutableInt32 aInt32 = new AMutableInt32(0);
-    protected AMutablePoint aPoint = new AMutablePoint(0, 0);
-    protected AMutableDateTime aDateTime = new AMutableDateTime(0);
-
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-
-    public abstract InflowState retrieveNextRecord() throws Exception;
-
-    @Override
-    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
-        try {
-            InflowState state = null;
-            int waitCount = 0;
-            boolean continueWait = true;
-            while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
-                state = retrieveNextRecord();
-                switch (state) {
-                    case DATA_AVAILABLE:
-                        recordBuilder.reset(mutableRecord.getType());
-                        recordBuilder.init();
-                        writeRecord(mutableRecord, dataOutput, recordBuilder);
-                        break;
-                    case DATA_NOT_AVAILABLE:
-                        if (waitCount > timeout) {
-                            continueWait = false;
-                        } else {
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Waiting to obtain data from pull based adaptor");
-                            }
-                            Thread.sleep(1000);
-                            waitCount++;
-                        }
-                        break;
-                    case NO_MORE_DATA:
-                        break;
-                }
-            }
-            return state;
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
-            throws IOException, AsterixException {
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        int numFields = record.getType().getFieldNames().length;
-        for (int pos = 0; pos < numFields; pos++) {
-            fieldValue.reset();
-            IAObject obj = record.getValueByPos(pos);
-            writeObject(obj, fieldValue.getDataOutput());
-            recordBuilder.addField(pos, fieldValue);
-        }
-        recordBuilder.write(dataOutput, true);
-    }
-
-    private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
-        switch (obj.getType().getTypeTag()) {
-            case RECORD: {
-                IARecordBuilder recordBuilder = new RecordBuilder();
-                recordBuilder.reset((ARecordType) obj.getType());
-                recordBuilder.init();
-                writeRecord((AMutableRecord) obj, dataOutput, recordBuilder);
-                break;
-            }
-
-            case ORDEREDLIST: {
-                OrderedListBuilder listBuilder = new OrderedListBuilder();
-                listBuilder.reset((AOrderedListType) ((AMutableOrderedList) obj).getType());
-                IACursor cursor = ((AMutableOrderedList) obj).getCursor();
-                ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
-                while (cursor.next()) {
-                    listItemValue.reset();
-                    IAObject item = cursor.get();
-                    writeObject(item, listItemValue.getDataOutput());
-                    listBuilder.addItem(listItemValue);
-                }
-                listBuilder.write(dataOutput, true);
-                break;
-            }
-
-            case UNORDEREDLIST: {
-                UnorderedListBuilder listBuilder = new UnorderedListBuilder();
-                listBuilder.reset((AUnorderedListType) ((AMutableUnorderedList) obj).getType());
-                IACursor cursor = ((AMutableUnorderedList) obj).getCursor();
-                ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
-                while (cursor.next()) {
-                    listItemValue.reset();
-                    IAObject item = cursor.get();
-                    writeObject(item, listItemValue.getDataOutput());
-                    listBuilder.addItem(listItemValue);
-                }
-                listBuilder.write(dataOutput, true);
-                break;
-            }
-
-            default:
-                AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(obj.getType()).serialize(obj,
-                        dataOutput);
-                break;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
deleted file mode 100644
index ff9af0c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public abstract class FileSystemBasedAdapter implements IDatasourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
-    public abstract InputStream getInputStream(int partition) throws IOException;
-
-    protected final ITupleParserFactory parserFactory;
-    protected ITupleParser tupleParser;
-    protected final IAType sourceDatatype;
-    protected IHyracksTaskContext ctx;
-
-    public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
-            throws HyracksDataException {
-        this.parserFactory = parserFactory;
-        this.sourceDatatype = sourceDatatype;
-        this.ctx = ctx;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        tupleParser = parserFactory.createTupleParser(ctx);
-        InputStream in = getInputStream(partition);
-        tupleParser.parse(in, writer);
-    }
-
-    public String getFilename(int partition) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
new file mode 100644
index 0000000..74e98dd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class GenericAdapter implements IDataSourceAdapter {
+
+    private static final long serialVersionUID = 1L;
+    private final IDataFlowController controller;
+
+    public GenericAdapter(IDataFlowController controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        controller.start(writer);
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        return controller.stop();
+    }
+
+    @Override
+    public boolean handleException(Throwable e) {
+        return controller.handleException(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
deleted file mode 100644
index 5f1b1ae..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
-import org.apache.asterix.external.indexing.input.GenericRecordReader;
-import org.apache.asterix.external.indexing.input.TextualDataReader;
-import org.apache.asterix.external.indexing.input.TextualFullScanDataReader;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * Provides functionality for fetching external data stored in an HDFS instance.
- */
-
-public class HDFSAdapter extends FileSystemBasedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private transient String[] readSchedule;
-    private transient boolean executed[];
-    private transient InputSplit[] inputSplits;
-    private transient JobConf conf;
-    private transient String nodeName;
-    private transient List<ExternalFile> files;
-    private transient Map<String, String> configuration;
-
-    public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
-            String nodeName, ITupleParserFactory parserFactory, IHyracksTaskContext ctx,
-            Map<String, String> configuration, List<ExternalFile> files) throws HyracksDataException {
-        super(parserFactory, atype, ctx);
-        this.readSchedule = readSchedule;
-        this.executed = executed;
-        this.inputSplits = inputSplits;
-        this.conf = conf;
-        this.nodeName = nodeName;
-        this.files = files;
-        this.configuration = configuration;
-    }
-
-    /*
-     * The method below was modified to take care of the following
-     * 1. when target files are not null, it generates a file aware input stream that validate against the files
-     * 2. if the data is binary, it returns a generic reader
-     */
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        if ((conf.getInputFormat() instanceof TextInputFormat
-                || conf.getInputFormat() instanceof SequenceFileInputFormat)
-                && (AsterixTupleParserFactory.FORMAT_ADM
-                        .equalsIgnoreCase(configuration.get(AsterixTupleParserFactory.KEY_FORMAT))
-                        || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
-                                .equalsIgnoreCase(configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
-            if (files != null) {
-                return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-            } else {
-                return new TextualFullScanDataReader(executed, inputSplits, readSchedule, nodeName, conf);
-            }
-        } else {
-            if (files != null) {
-                return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-            } else {
-                return new GenericRecordReader(inputSplits, readSchedule, nodeName, conf, executed);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
deleted file mode 100644
index 92a049d0..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
-import org.apache.asterix.external.indexing.input.RCFileDataReader;
-import org.apache.asterix.external.indexing.input.TextualDataReader;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class HDFSIndexingAdapter extends FileSystemBasedAdapter {
-
-    private static final long serialVersionUID = 1L;
-    private transient String[] readSchedule;
-    private transient boolean executed[];
-    private transient InputSplit[] inputSplits;
-    private transient JobConf conf;
-    private final List<ExternalFile> files;
-    private transient String nodeName;
-    // file input-format <text, seq, rc>
-    private String inputFormat;
-    // content format <adm, delimited-text, binary>
-    private String format;
-
-    public HDFSIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits,
-            JobConf conf, AlgebricksPartitionConstraint clusterLocations, List<ExternalFile> files,
-            ITupleParserFactory parserFactory, IHyracksTaskContext ctx, String nodeName, String inputFormat,
-            String format) throws IOException {
-        super(parserFactory, atype, ctx);
-        this.nodeName = nodeName;
-        this.readSchedule = readSchedule;
-        this.executed = executed;
-        this.inputSplits = inputSplits;
-        this.conf = conf;
-        this.files = files;
-        this.inputFormat = inputFormat;
-        this.format = format;
-    }
-
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
-            return new RCFileDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-        } else if (format.equals(AsterixTupleParserFactory.FORMAT_ADM)
-                || format.equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
-            return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-        } else {
-            return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java
deleted file mode 100644
index 1b8024d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HiveAdapter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
- */
-public class HiveAdapter extends FileSystemBasedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private HDFSAdapter hdfsAdapter;
-
-    public HiveAdapter(IAType atype, HDFSAdapter hdfsAdapter, ITupleParserFactory parserFactory, IHyracksTaskContext ctx)
-            throws HyracksDataException {
-        super(parserFactory, atype, ctx);
-        this.hdfsAdapter = hdfsAdapter;
-    }
-
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        return hdfsAdapter.getInputStream(partition);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
deleted file mode 100644
index e71f10c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- *
- * @author alamouda
- *
- */
-public interface IControlledAdapter extends Serializable {
-    
-    /**
-     * 
-     * @param ctx
-     * @param recordDescriptors 
-     * @throws Exception
-     */
-    public void initialize(IHyracksTaskContext ctx, INullWriterFactory iNullWriterFactory) throws Exception;
-
-    /**
-     * 
-     * @param buffer
-     * @param writer
-     * @throws HyracksDataException
-     */
-    public void nextFrame(ByteBuffer buffer, IFrameWriter writer) throws Exception;
-
-    /**
-     * 
-     * @param writer
-     * @throws HyracksDataException
-     */
-    public void close(IFrameWriter writer) throws Exception;
-    
-    /**
-     * Gives the adapter a chance to clean up
-     * @param writer
-     * @throws HyracksDataException
-     */
-    public void fail() throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java
deleted file mode 100644
index 6377960..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClient.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface IFeedClient {
-
-    public enum InflowState {
-        NO_MORE_DATA,
-        DATA_AVAILABLE,
-        DATA_NOT_AVAILABLE
-    }
-
-    /**
-     * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
-     * a new tuple has been written or the specified time has expired.
-     * 
-     * @param dataOutput
-     *            The receiving channel for the feed client to write ADM records to.
-     * @param timeout
-     *            Threshold time (expressed in seconds) for the next tuple to be obtained from the external source.
-     * @return
-     * @throws AsterixException
-     */
-    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java
deleted file mode 100644
index b175518..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IFeedClientFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public interface IFeedClientFactory {
-
-    public IPullBasedFeedClient createFeedClient(IHyracksTaskContext ctx, Map<String, String> configuration)
-            throws Exception;
-
-    public ARecordType getRecordType() throws AsterixException;
-
-    public FeedClientType getFeedClientType();
-
-    public enum FeedClientType {
-        GENERIC,
-        TYPED
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java
deleted file mode 100644
index f6c9dad..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface IPullBasedFeedClient {
-
-    public enum InflowState {
-        NO_MORE_DATA,
-        DATA_AVAILABLE,
-        DATA_NOT_AVAILABLE
-    }
-
-    /**
-     * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
-     * a new tuple has been written or the specified time has expired.
-     * 
-     * @param dataOutput
-     *            The receiving channel for the feed client to write ADM records to.
-     * @param timeout
-     *            Threshold time (expressed in seconds) for the next tuple to be obtained from the externa source.
-     * @return
-     * @throws AsterixException
-     */
-    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
new file mode 100644
index 0000000..ba6f83c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.indexing.RecordIdReader;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INullWriter;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public final class LookupAdapter<T> implements IFrameWriter {
+
+    private boolean propagateInput;
+    private int[] propagatedFields;
+    private boolean retainNull;
+    private ArrayTupleBuilder tb;
+    private FrameTupleAppender appender;
+    private IRecordDataParser<T> dataParser;
+    private ILookupRecordReader<? extends T> recordReader;
+    private RecordIdReader ridReader;
+    private FrameTupleAccessor tupleAccessor;
+    private IFrameWriter writer;
+    private FrameTupleReference frameTuple;
+    private ArrayTupleBuilder nullTupleBuild;
+
+    public LookupAdapter(IRecordDataParser<T> dataParser, ILookupRecordReader<? extends T> recordReader,
+            RecordDescriptor inRecDesc, RecordIdReader ridReader, boolean propagateInput, int[] propagatedFields,
+            boolean retainNull, INullWriterFactory iNullWriterFactory, IHyracksTaskContext ctx, IFrameWriter writer)
+                    throws HyracksDataException {
+        this.dataParser = dataParser;
+        this.recordReader = recordReader;
+        this.propagateInput = propagateInput;
+        this.propagatedFields = propagatedFields;
+        this.retainNull = retainNull;
+        this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
+        this.ridReader = ridReader;
+        ridReader.set(tupleAccessor, inRecDesc);
+        configurePropagation(iNullWriterFactory);
+        appender = new FrameTupleAppender(new VSizeFrame(ctx));
+        this.writer = writer;
+    }
+
+    private void configurePropagation(INullWriterFactory iNullWriterFactory) {
+        if (propagateInput) {
+            tb = new ArrayTupleBuilder(propagatedFields.length + 1);
+            frameTuple = new FrameTupleReference();
+        } else {
+            tb = new ArrayTupleBuilder(1);
+        }
+        if (retainNull) {
+            INullWriter nullWriter = iNullWriterFactory.createNullWriter();
+            nullTupleBuild = new ArrayTupleBuilder(1);
+            DataOutput out = nullTupleBuild.getDataOutput();
+            try {
+                nullWriter.writeNull(out);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            nullTupleBuild = null;
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        try {
+            recordReader.fail();
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
+        } finally {
+            writer.fail();
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            tupleAccessor.reset(buffer);
+            int tupleIndex = 0;
+            int tupleCount = tupleAccessor.getTupleCount();
+            while (tupleIndex < tupleCount) {
+                IRawRecord<? extends T> record = null;
+                RecordId rid = ridReader.read(tupleIndex);
+                if (rid != null) {
+                    record = recordReader.read(rid);
+                }
+                tb.reset();
+                if (propagateInput) {
+                    propagate(tupleIndex);
+                }
+                if (record != null) {
+                    dataParser.parse(record, tb.getDataOutput());
+                    tb.addFieldEndOffset();
+                    DataflowUtils.addTupleToFrame(appender, tb, writer);
+                } else if (retainNull) {
+                    tb.getDataOutput().write(nullTupleBuild.getByteArray());
+                    tb.addFieldEndOffset();
+                    DataflowUtils.addTupleToFrame(appender, tb, writer);
+                }
+                tupleIndex++;
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void propagate(int idx) throws IOException {
+        frameTuple.reset(tupleAccessor, idx);
+        for (int i = 0; i < propagatedFields.length; i++) {
+            tb.getDataOutput().write(frameTuple.getFieldData(propagatedFields[i]),
+                    frameTuple.getFieldStart(propagatedFields[i]), frameTuple.getFieldLength(propagatedFields[i]));
+            tb.addFieldEndOffset();
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            appender.flush(writer, true);
+        } finally {
+            writer.close();
+        }
+    }
+}



Mime
View raw message