asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/5] incubator-asterixdb git commit: Add flush() to IFrameWriter
Date Fri, 29 Jan 2016 23:52:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
new file mode 100644
index 0000000..f38c2cb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.twitter;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import twitter4j.Status;
+
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
+
+    private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+    private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
+
+    private Map<String, String> configuration;
+    private boolean pull;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+        if (!validateConfiguration(configuration)) {
+            StringBuilder builder = new StringBuilder();
+            builder.append("One or more parameters are missing from adapter configuration\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+            throw new Exception(builder.toString());
+        }
+        if (ExternalDataUtils.isPull(configuration)) {
+            pull = true;
+            if (configuration.get(SearchAPIConstants.QUERY) == null) {
+                throw new AsterixException(
+                        "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
+            }
+            String interval = configuration.get(SearchAPIConstants.INTERVAL);
+            if (interval != null) {
+                try {
+                    Integer.parseInt(interval);
+                } catch (NumberFormatException nfe) {
+                    throw new IllegalArgumentException(
+                            "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
+                }
+            } else {
+                configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+                            + DEFAULT_INTERVAL + ")");
+                }
+            }
+        } else if (ExternalDataUtils.isPush(configuration)) {
+            pull = false;
+        } else {
+            throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
+                    + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
+        }
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+        IRecordReader<Status> reader;
+        if (pull) {
+            reader = new TwitterPullRecordReader();
+        } else {
+            reader = new TwitterPushRecordReader();
+        }
+        reader.configure(configuration);
+        return reader;
+    }
+
+    @Override
+    public Class<? extends Status> getRecordClass() {
+        return Status.class;
+    }
+
+    private boolean validateConfiguration(Map<String, String> configuration) {
+        String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+        String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+        String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+        String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+        if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
index 73f6195..ce65249 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -20,9 +20,14 @@ package org.apache.asterix.external.input.stream;
 
 import java.io.InputStream;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
 public abstract class AInputStream extends InputStream {
     public abstract boolean skipError() throws Exception;
 
     public abstract boolean stop() throws Exception;
 
+    public void setController(AbstractFeedDataFlowController controller) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index 7ba6032..25418b0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -21,6 +21,8 @@ package org.apache.asterix.external.input.stream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
 public class AInputStreamReader extends InputStreamReader {
     private AInputStream in;
 
@@ -40,4 +42,8 @@ public class AInputStreamReader extends InputStreamReader {
             throw new IOException(e);
         }
     }
+
+    public void setController(AbstractFeedDataFlowController controller) {
+        in.setController(controller);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
deleted file mode 100644
index 8f4c094..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
-
-    public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
-        super(read, inputSplits, readSchedule, nodeName, conf);
-        value = new Text();
-        configure(configuration);
-        if (snapshot != null) {
-            setSnapshot(snapshot);
-            setIndexer(ExternalIndexerProvider.getIndexer(configuration));
-            if (currentSplitIndex < snapshot.size()) {
-                indexer.reset(this);
-            }
-        }
-    }
-
-    @Override
-    public AInputStream getInputStream() throws Exception {
-        return new HDFSInputStream();
-    }
-
-    private class HDFSInputStream extends AInputStream {
-        int pos = 0;
-
-        @Override
-        public int read() throws IOException {
-            if (value.getLength() < pos) {
-                if (!readMore()) {
-                    return -1;
-                }
-            } else if (value.getLength() == pos) {
-                pos++;
-                return ExternalDataConstants.BYTE_LF;
-            }
-            return value.getBytes()[pos++];
-        }
-
-        private int readRecord(byte[] buffer, int offset, int len) {
-            int actualLength = value.getLength() + 1;
-            if ((actualLength - pos) > len) {
-                //copy partial record
-                System.arraycopy(value.getBytes(), pos, buffer, offset, len);
-                pos += len;
-                return len;
-            } else {
-                int numBytes = value.getLength() - pos;
-                System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
-                buffer[offset + numBytes] = ExternalDataConstants.LF;
-                pos += numBytes;
-                numBytes++;
-                return numBytes;
-            }
-        }
-
-        @Override
-        public int read(byte[] buffer, int offset, int len) throws IOException {
-            if (value.getLength() > pos) {
-                return readRecord(buffer, offset, len);
-            }
-            if (!readMore()) {
-                return -1;
-            }
-            return readRecord(buffer, offset, len);
-        }
-
-        private boolean readMore() throws IOException {
-            try {
-                pos = 0;
-                return HDFSInputStreamProvider.this.hasNext();
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
-
-        @Override
-        public boolean skipError() throws Exception {
-            return true;
-        }
-
-        @Override
-        public boolean stop() throws Exception {
-            return false;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
deleted file mode 100644
index 22d0a87..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-public class LocalFSInputStreamProvider implements IInputStreamProvider {
-
-    private String expression;
-    private boolean isFeed;
-    private Path path;
-    private File feedLogFile;
-
-    public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
-            Map<String, String> configuration, int partition, String expression, boolean isFeed,
-            FileSplit[] feedLogFileSplits) {
-        this.expression = expression;
-        this.isFeed = isFeed;
-        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
-        if (feedLogFileSplits != null) {
-            this.feedLogFile = FeedUtils
-                    .getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
-                            feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager())
-                    .getFile();
-
-        }
-    }
-
-    @Override
-    public AInputStream getInputStream() throws IOException {
-        FeedLogManager feedLogManager = null;
-        if (isFeed && feedLogFile != null) {
-            feedLogManager = new FeedLogManager(feedLogFile);
-        }
-        return new LocalFileSystemInputStream(path, expression, feedLogManager, isFeed);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
index 7eebe4c..7b7cd8b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Path;
 
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FileSystemWatcher;
@@ -39,6 +40,11 @@ public class LocalFileSystemInputStream extends AInputStream {
     }
 
     @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        watcher.setController(controller);
+    }
+
+    @Override
     public void close() throws IOException {
         IOException ioe = null;
         if (in != null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
deleted file mode 100644
index 1f920e9..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.net.ServerSocket;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-
-public class SocketInputStreamProvider implements IInputStreamProvider {
-    private ServerSocket server;
-
-    public SocketInputStreamProvider(ServerSocket server) {
-        this.server = server;
-    }
-
-    @Override
-    public AInputStream getInputStream() throws Exception {
-        return new SocketInputStream(server);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
deleted file mode 100644
index 7c64aa3..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.util.TweetGenerator;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
-
-    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
-
-    private ExecutorService executorService;
-
-    private PipedOutputStream outputStream;
-
-    private PipedInputStream inputStream;
-
-    private TwitterServer twitterServer;
-
-    public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
-            throws Exception {
-        executorService = Executors.newCachedThreadPool();
-        outputStream = new PipedOutputStream();
-        inputStream = new PipedInputStream(outputStream);
-        twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
-    }
-
-    @Override
-    public AInputStream getInputStream() throws Exception {
-        twitterServer.start();
-        return twitterServer;
-    }
-
-    private static class TwitterServer extends AInputStream {
-        private final DataProvider dataProvider;
-        private final ExecutorService executorService;
-        private InputStream in;
-        private boolean started;
-
-        public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
-                ExecutorService executorService, InputStream in) throws Exception {
-            dataProvider = new DataProvider(configuration, partition, os);
-            this.executorService = executorService;
-            this.in = in;
-            this.started = false;
-        }
-
-        @Override
-        public boolean stop() throws IOException {
-            dataProvider.stop();
-            return true;
-        }
-
-        public void start() {
-            executorService.execute(dataProvider);
-        }
-
-        @Override
-        public boolean skipError() throws Exception {
-            return false;
-        }
-
-        @Override
-        public int read() throws IOException {
-            if (!started) {
-                start();
-                started = true;
-            }
-            return in.read();
-        }
-
-        @Override
-        public int read(byte b[], int off, int len) throws IOException {
-            if (!started) {
-                start();
-                started = true;
-            }
-            return in.read(b, off, len);
-        }
-    }
-
-    private static class DataProvider implements Runnable {
-
-        public static final String KEY_MODE = "mode";
-
-        private TweetGenerator tweetGenerator;
-        private boolean continuePush = true;
-        private int batchSize;
-        private final Mode mode;
-        private final OutputStream os;
-
-        public static enum Mode {
-            AGGRESSIVE,
-            CONTROLLED
-        }
-
-        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
-            this.tweetGenerator = new TweetGenerator(configuration, partition);
-            this.tweetGenerator.registerSubscriber(os);
-            this.os = os;
-            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
-                    : Mode.AGGRESSIVE;
-            switch (mode) {
-                case CONTROLLED:
-                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
-                    if (tpsValue == null) {
-                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
-                    }
-                    batchSize = Integer.parseInt(tpsValue);
-                    break;
-                case AGGRESSIVE:
-                    batchSize = 5000;
-                    break;
-            }
-        }
-
-        @Override
-        public void run() {
-            boolean moreData = true;
-            long startBatch;
-            long endBatch;
-
-            while (true) {
-                try {
-                    while (moreData && continuePush) {
-                        switch (mode) {
-                            case AGGRESSIVE:
-                                moreData = tweetGenerator.generateNextBatch(batchSize);
-                                break;
-                            case CONTROLLED:
-                                startBatch = System.currentTimeMillis();
-                                moreData = tweetGenerator.generateNextBatch(batchSize);
-                                endBatch = System.currentTimeMillis();
-                                if (endBatch - startBatch < 1000) {
-                                    Thread.sleep(1000 - (endBatch - startBatch));
-                                }
-                                break;
-                        }
-                    }
-                    os.close();
-                    break;
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in adaptor " + e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public void stop() {
-            continuePush = false;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index ab1f8a0..06833af 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -28,7 +28,7 @@ import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.INodeResolver;
 import org.apache.asterix.external.api.INodeResolverFactory;
-import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.LocalFSInputStreamProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index 37afa53..ea60f43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -30,7 +30,7 @@ import java.util.Set;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.SocketInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.SocketInputStreamProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.util.AsterixRuntimeUtil;
 import org.apache.commons.lang3.StringUtils;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
new file mode 100644
index 0000000..93a1685
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
+
+    public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
+        super(read, inputSplits, readSchedule, nodeName, conf);
+        value = new Text();
+        configure(configuration);
+        if (snapshot != null) {
+            setSnapshot(snapshot);
+            setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+            if (currentSplitIndex < snapshot.size()) {
+                indexer.reset(this);
+            }
+        }
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        return new HDFSInputStream();
+    }
+
+    private class HDFSInputStream extends AInputStream {
+        int pos = 0;
+
+        @Override
+        public int read() throws IOException {
+            if (value.getLength() < pos) {
+                if (!readMore()) {
+                    return -1;
+                }
+            } else if (value.getLength() == pos) {
+                pos++;
+                return ExternalDataConstants.BYTE_LF;
+            }
+            return value.getBytes()[pos++];
+        }
+
+        private int readRecord(byte[] buffer, int offset, int len) {
+            int actualLength = value.getLength() + 1;
+            if ((actualLength - pos) > len) {
+                //copy partial record
+                System.arraycopy(value.getBytes(), pos, buffer, offset, len);
+                pos += len;
+                return len;
+            } else {
+                int numBytes = value.getLength() - pos;
+                System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
+                buffer[offset + numBytes] = ExternalDataConstants.LF;
+                pos += numBytes;
+                numBytes++;
+                return numBytes;
+            }
+        }
+
+        @Override
+        public int read(byte[] buffer, int offset, int len) throws IOException {
+            if (value.getLength() > pos) {
+                return readRecord(buffer, offset, len);
+            }
+            if (!readMore()) {
+                return -1;
+            }
+            return readRecord(buffer, offset, len);
+        }
+
+        private boolean readMore() throws IOException {
+            try {
+                pos = 0;
+                return HDFSInputStreamProvider.this.hasNext();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+        @Override
+        public boolean skipError() throws Exception {
+            return true;
+        }
+
+        @Override
+        public boolean stop() throws Exception {
+            return false;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
new file mode 100644
index 0000000..4c4edd3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamProvider implements IInputStreamProvider {
+
+    private String expression;
+    private boolean isFeed;
+    private Path path;
+    private File feedLogFile;
+
+    public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
+            Map<String, String> configuration, int partition, String expression, boolean isFeed,
+            FileSplit[] feedLogFileSplits) {
+        this.expression = expression;
+        this.isFeed = isFeed;
+        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
+        if (feedLogFileSplits != null) {
+            this.feedLogFile = FeedUtils
+                    .getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
+                            feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager())
+                    .getFile();
+
+        }
+    }
+
+    @Override
+    public AInputStream getInputStream() throws IOException {
+        FeedLogManager feedLogManager = null;
+        if (isFeed && feedLogFile != null) {
+            feedLogManager = new FeedLogManager(feedLogFile);
+        }
+        return new LocalFileSystemInputStream(path, expression, feedLogManager, isFeed);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
new file mode 100644
index 0000000..2b12675
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.net.ServerSocket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.SocketInputStream;
+
+public class SocketInputStreamProvider implements IInputStreamProvider {
+    private ServerSocket server;
+
+    public SocketInputStreamProvider(ServerSocket server) {
+        this.server = server;
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        return new SocketInputStream(server);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
new file mode 100644
index 0000000..06f7e72
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
+
+    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
+
+    private ExecutorService executorService;
+
+    private PipedOutputStream outputStream;
+
+    private PipedInputStream inputStream;
+
+    private TwitterServer twitterServer;
+
+    public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
+            throws Exception {
+        executorService = Executors.newCachedThreadPool();
+        outputStream = new PipedOutputStream();
+        inputStream = new PipedInputStream(outputStream);
+        twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        twitterServer.start();
+        return twitterServer;
+    }
+
+    private static class TwitterServer extends AInputStream {
+        private final DataProvider dataProvider;
+        private final ExecutorService executorService;
+        private InputStream in;
+        private boolean started;
+
+        public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
+                ExecutorService executorService, InputStream in) throws Exception {
+            dataProvider = new DataProvider(configuration, partition, os);
+            this.executorService = executorService;
+            this.in = in;
+            this.started = false;
+        }
+
+        @Override
+        public boolean stop() throws IOException {
+            dataProvider.stop();
+            return true;
+        }
+
+        public void start() {
+            executorService.execute(dataProvider);
+        }
+
+        @Override
+        public boolean skipError() throws Exception {
+            return false;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (!started) {
+                start();
+                started = true;
+            }
+            return in.read();
+        }
+
+        @Override
+        public int read(byte b[], int off, int len) throws IOException {
+            if (!started) {
+                start();
+                started = true;
+            }
+            return in.read(b, off, len);
+        }
+    }
+
+    private static class DataProvider implements Runnable {
+
+        public static final String KEY_MODE = "mode";
+
+        private TweetGenerator tweetGenerator;
+        private boolean continuePush = true;
+        private int batchSize;
+        private final Mode mode;
+        private final OutputStream os;
+
+        public static enum Mode {
+            AGGRESSIVE,
+            CONTROLLED
+        }
+
+        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
+            this.tweetGenerator = new TweetGenerator(configuration, partition);
+            this.tweetGenerator.registerSubscriber(os);
+            this.os = os;
+            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+                    : Mode.AGGRESSIVE;
+            switch (mode) {
+                case CONTROLLED:
+                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+                    if (tpsValue == null) {
+                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+                    }
+                    batchSize = Integer.parseInt(tpsValue);
+                    break;
+                case AGGRESSIVE:
+                    batchSize = 5000;
+                    break;
+            }
+        }
+
+        @Override
+        public void run() {
+            boolean moreData = true;
+            long startBatch;
+            long endBatch;
+
+            while (true) {
+                try {
+                    while (moreData && continuePush) {
+                        switch (mode) {
+                            case AGGRESSIVE:
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                break;
+                            case CONTROLLED:
+                                startBatch = System.currentTimeMillis();
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                endBatch = System.currentTimeMillis();
+                                if (endBatch - startBatch < 1000) {
+                                    Thread.sleep(1000 - (endBatch - startBatch));
+                                }
+                                break;
+                        }
+                    }
+                    os.close();
+                    break;
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Exception in adaptor " + e.getMessage());
+                    }
+                }
+            }
+        }
+
+        public void stop() {
+            continuePush = false;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 9e35617..2cab3a7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -80,7 +80,7 @@ public class ExternalLookupOperatorDescriptor extends AbstractTreeIndexOperatorD
                 try {
                     adapter = adapterFactory.createAdapter(ctx, partition,
                             recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), snapshotAccessor, writer);
-                    //Open the file index accessor here
+                    // Open the file index accessor here
                     snapshotAccessor.open();
                     indexOpen = true;
                     adapter.open();
@@ -127,6 +127,11 @@ public class ExternalLookupOperatorDescriptor extends AbstractTreeIndexOperatorD
                     throw new HyracksDataException(th);
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                adapter.flush();
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 80a54be..fa2e513 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -25,9 +25,9 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -183,6 +183,9 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
                     inputSideHandler.nextFrame(null); // signal end of data
                     while (!inputSideHandler.isFinished()) {
                         synchronized (coreOperator) {
+                            if (inputSideHandler.isFinished()) {
+                                break;
+                            }
                             coreOperator.wait();
                         }
                     }
@@ -192,8 +195,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
             }
             coreOperator.close();
             System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
-        } catch (Exception e) {
-            e.printStackTrace();
+        } catch (InterruptedException e) {
+            throw new HyracksDataException(e);
         } finally {
             if (!stalled) {
                 deregister();
@@ -221,4 +224,9 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        inputSideHandler.flush();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
index 4dae72d..b09504a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
@@ -181,4 +181,9 @@ public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorN
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        inputSideHandler.flush();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index f75b3eb..3c4c9ad 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -188,6 +188,9 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
                 inputSideHandler.nextFrame(null); // signal end of data
                 while (!inputSideHandler.isFinished()) {
                     synchronized (coreOperator) {
+                        if (inputSideHandler.isFinished()) {
+                            break;
+                        }
                         coreOperator.wait();
                     }
                 }
@@ -195,8 +198,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
             }
             coreOperator.close();
         } catch (Exception e) {
-            e.printStackTrace();
-            // ignore
+            throw new HyracksDataException(e);
         } finally {
             if (!stalled) {
                 deregister();
@@ -217,4 +219,9 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        inputSideHandler.flush();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 129b62f..14a3e2a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -39,7 +39,6 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
@@ -67,7 +66,6 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
 
     private AdmLexer admLexer;
     private ARecordType recordType;
-    private boolean datasetRec;
     private boolean isStreamParser = true;
 
     private int nullableFieldId = 0;
@@ -142,7 +140,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
     public boolean parse(DataOutput out) throws AsterixException {
         try {
             resetPools();
-            return parseAdmInstance(recordType, datasetRec, out);
+            return parseAdmInstance(recordType, out);
         } catch (IOException e) {
             throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
         } catch (AdmLexerException e) {
@@ -163,12 +161,6 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
     public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException {
         this.recordType = recordType;
         this.configuration = configuration;
-        String isDatasetRecordString = configuration.get(ExternalDataConstants.KEY_DATASET_RECORD);
-        if (isDatasetRecordString == null) {
-            this.datasetRec = true;
-        } else {
-            this.datasetRec = Boolean.parseBoolean(isDatasetRecordString);
-        }
         this.isStreamParser = ExternalDataUtils.isDataSourceStreamProvider(configuration);
         if (!isStreamParser) {
             this.admLexer = new AdmLexer();
@@ -180,7 +172,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
         try {
             resetPools();
             admLexer.setBuffer(record.get());
-            parseAdmInstance(recordType, datasetRec, out);
+            parseAdmInstance(recordType, out);
         } catch (IOException e) {
             throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
         } catch (AdmLexerException e) {
@@ -201,18 +193,18 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
         admLexer = new AdmLexer(new java.io.InputStreamReader(in));
     }
 
-    protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
+    protected boolean parseAdmInstance(IAType objectType, DataOutput out)
             throws AsterixException, IOException, AdmLexerException {
         int token = admLexer.next();
         if (token == AdmLexer.TOKEN_EOF) {
             return false;
         } else {
-            admFromLexerStream(token, objectType, out, datasetRec);
+            admFromLexerStream(token, objectType, out);
             return true;
         }
     }
 
-    private void admFromLexerStream(int token, IAType objectType, DataOutput out, Boolean datasetRec)
+    private void admFromLexerStream(int token, IAType objectType, DataOutput out)
             throws AsterixException, IOException, AdmLexerException {
 
         switch (token) {
@@ -441,7 +433,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
             case AdmLexer.TOKEN_START_RECORD: {
                 if (checkType(ATypeTag.RECORD, objectType)) {
                     objectType = getComplexType(objectType, ATypeTag.RECORD);
-                    parseRecord((ARecordType) objectType, out, datasetRec);
+                    parseRecord((ARecordType) objectType, out);
                 } else {
                     throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
                 }
@@ -567,7 +559,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
         return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
     }
 
-    private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
+    private void parseRecord(ARecordType recType, DataOutput out)
             throws IOException, AsterixException, AdmLexerException {
 
         ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
@@ -575,16 +567,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
         IARecordBuilder recBuilder = getRecordBuilder();
 
         BitSet nulls = null;
-        if (datasetRec) {
-
-            if (recType != null) {
-                nulls = new BitSet(recType.getFieldNames().length);
-                recBuilder.reset(recType);
-            } else {
-                recBuilder.reset(null);
-            }
-
-        } else if (recType != null) {
+        if (recType != null) {
+            //TODO: use BitSet Pool
             nulls = new BitSet(recType.getFieldNames().length);
             recBuilder.reset(recType);
         } else {
@@ -650,7 +634,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                     }
 
                     token = admLexer.next();
-                    this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput(), false);
+                    this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput());
                     if (openRecordField) {
                         if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
                             recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
@@ -752,7 +736,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                 expectingListItem = false;
                 itemBuffer.reset();
 
-                admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+                admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
                 orderedListBuilder.addItem(itemBuffer);
             }
             first = false;
@@ -799,7 +783,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
             } else {
                 expectingListItem = false;
                 itemBuffer.reset();
-                admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+                admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
                 unorderedListBuilder.addItem(itemBuffer);
             }
             first = false;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 6c399c3..2f0fc86 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -28,6 +28,7 @@ import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
+import org.apache.asterix.external.api.IDataParser;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -124,18 +125,6 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
         }
     }
 
-    protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
-            throws HyracksDataException {
-        buffer.reset();
-        DataOutput out = buffer.getDataOutput();
-        str.setValue(fieldName);
-        try {
-            stringSerde.serialize(str, out);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
     @Override
     public DataSourceType getDataSourceType() {
         return isStreamParser ? DataSourceType.STREAM : DataSourceType.RECORDS;
@@ -173,7 +162,8 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
                     throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
                 } else {
                     nameBuffers[i] = new ArrayBackedValueStorage();
-                    fieldNameToBytes(name, str, nameBuffers[i]);
+                    str.setValue(name);
+                    IDataParser.toBytes(str, nameBuffers[i], stringSerde);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
new file mode 100644
index 0000000..ecdb03d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class RecordWithMetadataParser<T> implements IRecordDataParser<RecordWithMetadata<T>> {
+
+    private final Class<? extends RecordWithMetadata<T>> clazz;
+    private final int[] metaIndexes;
+    private final int valueIndex;
+    private ARecordType recordType;
+    private IRecordDataParser<T> valueParser;
+    private RecordBuilder recBuilder;
+    private ArrayBackedValueStorage[] nameBuffers;
+    private int numberOfFields;
+    private ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage();
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+
+    public RecordWithMetadataParser(Class<? extends RecordWithMetadata<T>> clazz, int[] metaIndexes,
+            IRecordDataParser<T> valueParser, int valueIndex) {
+        this.clazz = clazz;
+        this.metaIndexes = metaIndexes;
+        this.valueParser = valueParser;
+        this.valueIndex = valueIndex;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType)
+            throws HyracksDataException, IOException {
+        this.recordType = recordType;
+        this.numberOfFields = recordType.getFieldNames().length;
+        recBuilder = new RecordBuilder();
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        nameBuffers = new ArrayBackedValueStorage[numberOfFields];
+        AMutableString str = new AMutableString(null);
+        for (int i = 0; i < numberOfFields; i++) {
+            String name = recordType.getFieldNames()[i];
+            nameBuffers[i] = new ArrayBackedValueStorage();
+            str.setValue(name);
+            IDataParser.toBytes(str, nameBuffers[i], stringSerde);
+        }
+    }
+
+    @Override
+    public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+        return clazz;
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends RecordWithMetadata<T>> record, DataOutput out) throws Exception {
+        recBuilder.reset(recordType);
+        valueBuffer.reset();
+        recBuilder.init();
+        RecordWithMetadata<T> rwm = record.get();
+        for (int i = 0; i < numberOfFields; i++) {
+            if (i == valueIndex) {
+                valueParser.parse(rwm.getRecord(), valueBuffer.getDataOutput());
+                recBuilder.addField(i, valueBuffer);
+            } else {
+                recBuilder.addField(i, rwm.getMetadata(metaIndexes[i]));
+            }
+        }
+        recBuilder.write(out, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
new file mode 100644
index 0000000..88a0683
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecordWithMetadataParserFactory<T> implements IRecordDataParserFactory<RecordWithMetadata<T>> {
+
+    private static final long serialVersionUID = 1L;
+    private Class<? extends RecordWithMetadata<T>> recordClass;
+    private ARecordType recordType;
+    private int[] metaIndexes;
+    private IRecordDataParserFactory<T> valueParserFactory;
+    private int valueIndex;
+
+    @Override
+    public DataSourceType getDataSourceType() throws AsterixException {
+        return DataSourceType.RECORDS;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        // validation first
+        if (!configuration.containsKey(ExternalDataConstants.KEY_META_INDEXES)) {
+            throw new HyracksDataException(
+                    "the parser parameter (" + ExternalDataConstants.KEY_META_INDEXES + ") is missing");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_INDEX)) {
+            throw new HyracksDataException(
+                    "the parser parameter (" + ExternalDataConstants.KEY_VALUE_INDEX + ") is missing");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_FORMAT)) {
+            throw new HyracksDataException(
+                    "the parser parameter (" + ExternalDataConstants.KEY_VALUE_FORMAT + ") is missing");
+        }
+        // get meta field indexes
+        String[] stringMetaIndexes = configuration.get(ExternalDataConstants.KEY_META_INDEXES).split(",");
+        metaIndexes = new int[stringMetaIndexes.length];
+        for (int i = 0; i < stringMetaIndexes.length; i++) {
+            metaIndexes[i] = Integer.parseInt(stringMetaIndexes[i].trim());
+        }
+        // get value index
+        valueIndex = Integer.parseInt(configuration.get(ExternalDataConstants.KEY_VALUE_INDEX).trim());
+        // get value format
+        configuration.put(ExternalDataConstants.KEY_DATA_PARSER,
+                configuration.get(ExternalDataConstants.KEY_VALUE_FORMAT));
+        valueParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
+        valueParserFactory.setRecordType((ARecordType) recordType.getFieldTypes()[valueIndex]);
+        valueParserFactory.configure(configuration);
+        recordClass = (Class<? extends RecordWithMetadata<T>>) (new RecordWithMetadata<T>(
+                valueParserFactory.getRecordClass())).getClass();
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public IRecordDataParser<RecordWithMetadata<T>> createRecordParser(IHyracksTaskContext ctx)
+            throws HyracksDataException, AsterixException, IOException {
+        IRecordDataParser<T> valueParser = valueParserFactory.createRecordParser(ctx);
+        return new RecordWithMetadataParser<T>(recordClass, metaIndexes, valueParser, valueIndex);
+    }
+
+    @Override
+    public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+        return recordClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index a7ab062..745c653 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -25,9 +25,10 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
-import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.couchbase.CouchbaseReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
 import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -97,8 +98,11 @@ public class DatasourceFactoryProvider {
                 case ExternalDataConstants.READER_TWITTER_PUSH:
                     readerFactory = new TwitterRecordReaderFactory();
                     break;
+                case ExternalDataConstants.READER_COUCHBASE:
+                    readerFactory = new CouchbaseReaderFactory();
+                    break;
                 default:
-                    throw new AsterixException("unknown record reader factory");
+                    throw new AsterixException("unknown record reader factory: " + reader);
             }
         }
         return readerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
new file mode 100644
index 0000000..18b9cb5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSLookupReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+
+public class LookupReaderFactoryProvider {
+
+    @SuppressWarnings("rawtypes")
+    public static ILookupReaderFactory getLookupReaderFactory(Map<String, String> configuration) throws Exception {
+        String inputFormat = HDFSUtils.getInputFormatClassName(configuration);
+        if (inputFormat.equals(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
+                || inputFormat.equals(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)
+                || inputFormat.equals(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
+            HDFSLookupReaderFactory<Object> readerFactory = new HDFSLookupReaderFactory<Object>();
+            readerFactory.configure(configuration);
+            return readerFactory;
+        } else {
+            throw new AsterixException("Unrecognized external format");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index f5a0512..30595db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -22,10 +22,12 @@ import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
 import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
 import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
 import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
 import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory;
 import org.apache.asterix.external.parser.factory.TweetParserFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -39,13 +41,12 @@ public class ParserFactoryProvider {
             return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
                     parserFactoryName);
         } else {
-            parserFactory = ParserFactoryProvider.getParserFactory(configuration);
+            parserFactory = ParserFactoryProvider.getParserFactory(ExternalDataUtils.getRecordFormat(configuration));
         }
         return parserFactory;
     }
 
-    private static IDataParserFactory getParserFactory(Map<String, String> configuration) throws AsterixException {
-        String recordFormat = ExternalDataUtils.getRecordFormat(configuration);
+    private static IDataParserFactory getParserFactory(String recordFormat) throws AsterixException {
         switch (recordFormat) {
             case ExternalDataConstants.FORMAT_ADM:
             case ExternalDataConstants.FORMAT_JSON:
@@ -58,6 +59,8 @@ public class ParserFactoryProvider {
                 return new TweetParserFactory();
             case ExternalDataConstants.FORMAT_RSS:
                 return new RSSParserFactory();
+            case ExternalDataConstants.FORMAT_RECORD_WITH_META:
+                return new RecordWithMetadataParserFactory<RecordWithMetadata<?>>();
             default:
                 throw new AsterixException("Unknown data format");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 90c74e1..27a1d0e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -36,7 +36,7 @@ public class DataflowUtils {
     public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
             throws HyracksDataException {
         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
+            appender.write(writer, true);
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                 throw new HyracksDataException("Tuple is too large for a frame");
             }


Mime
View raw message