asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [09/13] incubator-asterixdb git commit: Improve Error Handling in Local Directory Feeds
Date Sat, 26 Mar 2016 20:27:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
index 68f10f6..4d44001 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -36,7 +36,7 @@ public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<c
             throws HyracksDataException {
         String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
         boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-        Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
+        Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
         if (quoteString != null) {
             return new QuotedLineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second, quoteString);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 6266aa2..abd2952 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -20,8 +20,8 @@ package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -32,8 +32,8 @@ public class QuotedLineRecordReader extends LineRecordReader {
     private boolean prevCharEscape;
     private boolean inQuote;
 
-    public QuotedLineRecordReader(final boolean hasHeader, final AInputStream stream, final IExternalIndexer indexer,
-            final String quoteString) throws HyracksDataException {
+    public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream,
+            final IExternalIndexer indexer, final String quoteString) throws HyracksDataException {
         super(hasHeader, stream, indexer);
         if ((quoteString == null) || (quoteString.length() != 1)) {
             throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 678dd03..7339bfd 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -21,8 +21,8 @@ package org.apache.asterix.external.input.record.reader.stream;
 import java.io.IOException;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 
@@ -35,7 +35,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
     private char recordEnd;
     private int recordNumber = 0;
 
-    public SemiStructuredRecordReader(AInputStream stream, IExternalIndexer indexer, String recStartString,
+    public SemiStructuredRecordReader(AsterixInputStream stream, IExternalIndexer indexer, String recStartString,
             String recEndString) throws AsterixException {
         super(stream, indexer);
         // set record opening char
@@ -100,9 +100,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
                             && inputBuffer[bufferPosn] != ExternalDataConstants.LF
                             && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
                         // corrupted file. clear the buffer and stop reading
-                        if (!reader.skipError()) {
-                            reader.close();
-                        }
+                        reader.reset();
                         bufferPosn = bufferLength = 0;
                         throw new IOException("Malformed input stream");
                     }
@@ -139,7 +137,13 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
 
             int appendLength = bufferPosn - startPosn;
             if (appendLength > 0) {
-                record.append(inputBuffer, startPosn, appendLength);
+                try {
+                    record.append(inputBuffer, startPosn, appendLength);
+                } catch (IOException e) {
+                    reader.reset();
+                    bufferPosn = bufferLength = 0;
+                    throw new IOException("Malformed input stream");
+                }
             }
         } while (!hasFinished);
         record.endRecord();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
index 206ae50..0f50204 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -19,9 +19,9 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,7 +34,7 @@ public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReade
     @Override
     public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
+        Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
         try {
             return new SemiStructuredRecordReader(streamAndIndexer.first, streamAndIndexer.second,
                     configuration.get(ExternalDataConstants.KEY_RECORD_START),

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index be9ce06..e31325a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -21,9 +21,9 @@ package org.apache.asterix.external.input.record.reader.twitter;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -96,6 +96,11 @@ public class TwitterPullRecordReader implements IRecordReader<Status> {
     }
 
     @Override
-    public void setController(IDataFlowController controller) {
+    public void setController(AbstractFeedDataFlowController controller) {
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 64695b5..f04cdb9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -21,9 +21,9 @@ package org.apache.asterix.external.input.record.reader.twitter;
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
 
@@ -131,6 +131,11 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
     }
 
     @Override
-    public void setController(IDataFlowController controller) {
+    public void setController(AbstractFeedDataFlowController controller) {
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
deleted file mode 100644
index b78f96d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.InputStream;
-
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
-
-public abstract class AInputStream extends InputStream {
-    public abstract boolean skipError() throws Exception;
-
-    public abstract boolean stop() throws Exception;
-
-    // TODO: Find a better way to send notifications
-    public abstract void setController(AbstractFeedDataFlowController controller);
-
-    // TODO: Find a better way to send notifications
-    public abstract void setFeedLogManager(FeedLogManager logManager);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
deleted file mode 100644
index bf85330..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
-
-public class AInputStreamReader extends Reader {
-    private AInputStream in;
-    private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
-    private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    private CharsetDecoder decoder;
-    private boolean done = false;
-
-    public AInputStreamReader(AInputStream in) {
-        this.in = in;
-        this.decoder = StandardCharsets.UTF_8.newDecoder();
-        this.byteBuffer.flip();
-    }
-
-    public boolean skipError() throws Exception {
-        return in.skipError();
-    }
-
-    public void stop() throws IOException {
-        try {
-            in.stop();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    public void setController(AbstractFeedDataFlowController controller) {
-        in.setController(controller);
-    }
-
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-        in.setFeedLogManager(feedLogManager);
-    }
-
-    @Override
-    public int read(char cbuf[]) throws IOException {
-        return read(cbuf, 0, cbuf.length);
-    }
-
-    @Override
-    public int read(char cbuf[], int offset, int length) throws IOException {
-        if (done) {
-            return -1;
-        }
-        int len = 0;
-        charBuffer.clear();
-        while (charBuffer.position() == 0) {
-            if (byteBuffer.hasRemaining()) {
-                decoder.decode(byteBuffer, charBuffer, false);
-                System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
-                if (charBuffer.position() > 0) {
-                    return charBuffer.position();
-                } else {
-                    // need to read more data
-                    System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
-                    byteBuffer.position(byteBuffer.remaining());
-                    while (len == 0) {
-                        len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
-                    }
-                }
-            } else {
-                byteBuffer.clear();
-                while (len == 0) {
-                    len = in.read(bytes, 0, bytes.length);
-                }
-            }
-            if (len == -1) {
-                done = true;
-                return len;
-            }
-            byteBuffer.position(len);
-            byteBuffer.flip();
-            decoder.decode(byteBuffer, charBuffer, false);
-            System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
-        }
-        return charBuffer.position();
-    }
-
-    @Override
-    public void close() throws IOException {
-        in.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
new file mode 100644
index 0000000..b1ef892
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+
+public class AsterixInputStreamReader extends Reader {
+    private AsterixInputStream in;
+    private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private CharsetDecoder decoder;
+    private boolean done = false;
+
+    public AsterixInputStreamReader(AsterixInputStream in) {
+        this.in = in;
+        this.decoder = StandardCharsets.UTF_8.newDecoder();
+        this.byteBuffer.flip();
+    }
+
+    public void stop() throws IOException {
+        try {
+            in.stop();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void setController(AbstractFeedDataFlowController controller) {
+        in.setController(controller);
+    }
+
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        in.setFeedLogManager(feedLogManager);
+    }
+
+    @Override
+    public int read(char cbuf[]) throws IOException {
+        return read(cbuf, 0, cbuf.length);
+    }
+
+    @Override
+    public int read(char cbuf[], int offset, int length) throws IOException {
+        if (done) {
+            return -1;
+        }
+        int len = 0;
+        charBuffer.clear();
+        while (charBuffer.position() == 0) {
+            if (byteBuffer.hasRemaining()) {
+                decoder.decode(byteBuffer, charBuffer, false);
+                System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+                if (charBuffer.position() > 0) {
+                    return charBuffer.position();
+                } else {
+                    // need to read more data
+                    System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
+                    byteBuffer.position(byteBuffer.remaining());
+                    while (len == 0) {
+                        len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
+                    }
+                }
+            } else {
+                byteBuffer.clear();
+                while (len == 0) {
+                    len = in.read(bytes, 0, bytes.length);
+                }
+            }
+            if (len == -1) {
+                done = true;
+                return len;
+            }
+            byteBuffer.position(len);
+            byteBuffer.flip();
+            decoder.decode(byteBuffer, charBuffer, false);
+            System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+        }
+        return charBuffer.position();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    public boolean handleException(Throwable th) {
+        return in.handleException(th);
+    }
+
+    @Override
+    public void reset() throws IOException {
+        byteBuffer.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
index 176f5f4..e4eeba3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
@@ -21,10 +21,9 @@ package org.apache.asterix.external.input.stream;
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.api.AsterixInputStream;
 
-public class BasicInputStream extends AInputStream {
+public class BasicInputStream extends AsterixInputStream {
     private final InputStream in;
 
     public BasicInputStream(InputStream in) {
@@ -78,20 +77,12 @@ public class BasicInputStream extends AInputStream {
     }
 
     @Override
-    public boolean skipError() {
-        return false;
-    }
-
-    @Override
     public boolean stop() throws Exception {
         return false;
     }
 
     @Override
-    public void setFeedLogManager(FeedLogManager logManager) {
-    }
-
-    @Override
-    public void setController(AbstractFeedDataFlowController controller) {
+    public boolean handleException(Throwable th) {
+        return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
new file mode 100644
index 0000000..063b8fa
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HDFSInputStream extends AsterixInputStream implements IIndexingDatasource {
+
+    private RecordReader<Object, Text> reader;
+    private Text value = null;
+    private Object key = null;
+    private int currentSplitIndex = 0;
+    private boolean read[];
+    private InputFormat<?, Text> inputFormat;
+    private InputSplit[] inputSplits;
+    private String[] readSchedule;
+    private String nodeName;
+    private JobConf conf;
+    // Indexing variables
+    private final IExternalIndexer indexer;
+    private final List<ExternalFile> snapshot;
+    private final FileSystem hdfs;
+    private int pos = 0;
+
+    @SuppressWarnings("unchecked")
+    public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot)
+                    throws IOException, AsterixException {
+        this.read = read;
+        this.inputSplits = inputSplits;
+        this.readSchedule = readSchedule;
+        this.nodeName = nodeName;
+        this.conf = conf;
+        this.inputFormat = conf.getInputFormat();
+        this.reader = new EmptyRecordReader<Object, Text>();
+        this.snapshot = snapshot;
+        this.hdfs = FileSystem.get(conf);
+        nextInputSplit();
+        this.value = new Text();
+        if (snapshot != null) {
+            this.indexer = ExternalIndexerProvider.getIndexer(configuration);
+            if (currentSplitIndex < snapshot.size()) {
+                indexer.reset(this);
+            }
+        } else {
+            this.indexer = null;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (value.getLength() < pos) {
+            if (!readMore()) {
+                return -1;
+            }
+        } else if (value.getLength() == pos) {
+            pos++;
+            return ExternalDataConstants.BYTE_LF;
+        }
+        return value.getBytes()[pos++];
+    }
+
+    private int readRecord(byte[] buffer, int offset, int len) {
+        int actualLength = value.getLength() + 1;
+        if ((actualLength - pos) > len) {
+            //copy partial record
+            System.arraycopy(value.getBytes(), pos, buffer, offset, len);
+            pos += len;
+            return len;
+        } else {
+            int numBytes = value.getLength() - pos;
+            System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
+            buffer[offset + numBytes] = ExternalDataConstants.LF;
+            pos += numBytes;
+            numBytes++;
+            return numBytes;
+        }
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int len) throws IOException {
+        if (value.getLength() > pos) {
+            return readRecord(buffer, offset, len);
+        }
+        if (!readMore()) {
+            return -1;
+        }
+        return readRecord(buffer, offset, len);
+    }
+
+    private boolean readMore() throws IOException {
+        try {
+            pos = 0;
+            return HDFSInputStream.this.hasNext();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    private boolean hasNext() throws Exception {
+        if (reader.next(key, value)) {
+            return true;
+        }
+        while (nextInputSplit()) {
+            if (reader.next(key, value)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean nextInputSplit() throws IOException {
+        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+            /**
+             * read all the partitions scheduled to the current node
+             */
+            if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                /**
+                 * pick an unread split to read synchronize among
+                 * simultaneous partitions in the same machine
+                 */
+                synchronized (read) {
+                    if (read[currentSplitIndex] == false) {
+                        read[currentSplitIndex] = true;
+                    } else {
+                        continue;
+                    }
+                }
+                if (snapshot != null) {
+                    String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
+                    FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
+                    // Skip if not the same file stored in the files snapshot
+                    if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
+                            .getTime()) {
+                        continue;
+                    }
+                }
+
+                reader.close();
+                reader = getRecordReader(currentSplitIndex);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    private RecordReader<Object, Text> getRecordReader(int splitIndex) throws IOException {
+        reader = (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+        if (key == null) {
+            key = reader.createKey();
+            value = reader.createValue();
+        }
+        if (indexer != null) {
+            try {
+                indexer.reset(this);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        return reader;
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public List<ExternalFile> getSnapshot() {
+        return snapshot;
+    }
+
+    @Override
+    public int getCurrentSplitIndex() {
+        return currentSplitIndex;
+    }
+
+    @Override
+    public RecordReader<?, ? extends Writable> getReader() {
+        return reader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
new file mode 100644
index 0000000..00c1eb7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.log4j.Logger;
+
+public class LocalFSInputStream extends AsterixInputStream {
+
+    private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName());
+    private final Path path;
+    private final FileSystemWatcher watcher;
+    private FileInputStream in;
+    private byte lastByte;
+    private File currentFile;
+
+    public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
+            final Map<String, String> configuration, final int partition, final String expression, final boolean isFeed)
+            throws IOException {
+        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
+        this.watcher = new FileSystemWatcher(path, expression, isFeed);
+        this.watcher.init();
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager logManager) {
+        super.setFeedLogManager(logManager);
+        watcher.setFeedLogManager(logManager);
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        super.setController(controller);
+        watcher.setController(controller);
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException ioe = null;
+        if (in != null) {
+            try {
+                closeFile();
+            } catch (Exception e) {
+                ioe = new IOException(e);
+            }
+        }
+        try {
+            watcher.close();
+        } catch (Exception e) {
+            if (ioe == null) {
+                throw e;
+            }
+            ioe.addSuppressed(e);
+            throw ioe;
+        }
+    }
+
+    private void closeFile() throws IOException {
+        if (in != null) {
+            try {
+                in.close();
+            } finally {
+                in = null;
+                currentFile = null;
+            }
+        }
+    }
+
+    /**
+     * Closes the current input stream and opens the next one, if any.
+     */
+    private boolean advance() throws IOException {
+        closeFile();
+        if (watcher.hasNext()) {
+            currentFile = watcher.next();
+            in = new FileInputStream(currentFile);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        throw new HyracksDataException(
+                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (in == null) {
+            if (!advance()) {
+                return -1;
+            }
+        }
+        int result = in.read(b, off, len);
+        while ((result < 0) && advance()) {
+            // return a new line at the end of every file <--Might create problems for some cases
+            // depending on the parser implementation-->
+            if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
+                lastByte = ExternalDataConstants.BYTE_LF;
+                b[off] = ExternalDataConstants.BYTE_LF;
+                return 1;
+            }
+            // recursive call
+            result = in.read(b, off, len);
+        }
+        if (result > 0) {
+            lastByte = b[(off + result) - 1];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        watcher.close();
+        return true;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        if (in == null) {
+            return false;
+        }
+        if (th instanceof IOException) {
+            // TODO: Change from string check to exception type
+            if (th.getCause().getMessage().contains("Malformed input stream")) {
+                if (currentFile != null) {
+                    try {
+                        logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
+                    } catch (IOException e) {
+                        LOGGER.warn("Filed to write to feed log file", e);
+                    }
+                    LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
+                }
+                try {
+                    advance();
+                    return true;
+                } catch (Exception e) {
+                    return false;
+                }
+            } else {
+                try {
+                    watcher.init();
+                } catch (IOException e) {
+                    LOGGER.warn("Failed to initialize watcher during failure recovery", e);
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
deleted file mode 100644
index dc6a130..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Path;
-
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FileSystemWatcher;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class LocalFileSystemInputStream extends AInputStream {
-    private final FileSystemWatcher watcher;
-    private FileInputStream in;
-    private byte lastByte;
-
-    public LocalFileSystemInputStream(Path inputResource, String expression, boolean isFeed)
-            throws HyracksDataException {
-        this.watcher = new FileSystemWatcher(inputResource, expression, isFeed);
-        watcher.init();
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager logManager) {
-        watcher.setFeedLogManager(logManager);
-    }
-
-    @Override
-    public void setController(AbstractFeedDataFlowController controller) {
-        watcher.setController(controller);
-    }
-
-    @Override
-    public void close() throws IOException {
-        IOException ioe = null;
-        if (in != null) {
-            try {
-                closeFile();
-            } catch (Exception e) {
-                ioe = new IOException(e);
-            }
-        }
-        try {
-            watcher.close();
-        } catch (Exception e) {
-            if (ioe == null) {
-                throw e;
-            }
-            ioe.addSuppressed(e);
-            throw ioe;
-        }
-    }
-
-    private void closeFile() throws IOException {
-        if (in != null) {
-            try {
-                in.close();
-            } finally {
-                in = null;
-            }
-        }
-    }
-
-    /**
-     * Closes the current input stream and opens the next one, if any.
-     */
-    private boolean advance() throws IOException {
-        closeFile();
-        if (watcher.hasNext()) {
-            in = new FileInputStream(watcher.next());
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new HyracksDataException(
-                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (in == null) {
-            if (!advance()) {
-                return -1;
-            }
-        }
-        int result = in.read(b, off, len);
-        while ((result < 0) && advance()) {
-            // return a new line at the end of every file <--Might create problems for some cases
-            // depending on the parser implementation-->
-            if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
-                lastByte = ExternalDataConstants.BYTE_LF;
-                b[off] = ExternalDataConstants.BYTE_LF;
-                return 1;
-            }
-            // recursive call
-            result = in.read(b, off, len);
-        }
-        if (result > 0) {
-            lastByte = b[(off + result) - 1];
-        }
-        return result;
-    }
-
-    @Override
-    public boolean skipError() throws Exception {
-        advance();
-        return true;
-    }
-
-    @Override
-    public boolean stop() throws Exception {
-        watcher.close();
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketClientInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketClientInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketClientInputStream.java
new file mode 100644
index 0000000..4321bcd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketClientInputStream.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class SocketClientInputStream extends AsterixInputStream {
+
+    private final Socket socket;
+    private InputStream in;
+
+    public SocketClientInputStream(Pair<String, Integer> address) throws UnknownHostException, IOException {
+        this.socket = new Socket(address.first, address.second);
+        this.in = socket.getInputStream();
+    }
+
+    @Override
+    public int read() throws IOException {
+        return in.read();
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+        return in.read(buffer, offset, length);
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        if (!socket.isClosed()) {
+            socket.close();
+        }
+        return true;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 1c33709..ce1d893 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -23,16 +23,16 @@ import java.io.InputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
 
-public class SocketServerInputStream extends AInputStream {
+public class SocketServerInputStream extends AsterixInputStream {
+    private static final Logger LOGGER = Logger.getLogger(SocketServerInputStream.class.getName());
     private ServerSocket server;
     private Socket socket;
     private InputStream connectionStream;
-    private AbstractFeedDataFlowController controller;
 
     public SocketServerInputStream(ServerSocket server) {
         this.server = server;
@@ -56,12 +56,6 @@ public class SocketServerInputStream extends AInputStream {
     }
 
     @Override
-    public boolean skipError() throws Exception {
-        accept();
-        return true;
-    }
-
-    @Override
     public int read(byte b[]) throws IOException {
         return read(b, 0, b.length);
     }
@@ -160,11 +154,13 @@ public class SocketServerInputStream extends AInputStream {
     }
 
     @Override
-    public void setFeedLogManager(FeedLogManager logManager) {
-    }
-
-    @Override
-    public void setController(AbstractFeedDataFlowController controller) {
-        this.controller = controller;
+    public boolean handleException(Throwable th) {
+        try {
+            accept();
+        } catch (IOException e) {
+            LOGGER.warn("Failed accepting more connections", e);
+            return false;
+        }
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
new file mode 100644
index 0000000..e2afd7b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.util.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStream extends AsterixInputStream {
+
+    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStream.class.getName());
+    private final ExecutorService executorService;
+    private final PipedOutputStream outputStream;
+    private final PipedInputStream inputStream;
+    private final DataProvider dataProvider;
+    private boolean started;
+
+    public TwitterFirehoseInputStream(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
+            throws IOException {
+        executorService = Executors.newCachedThreadPool();
+        outputStream = new PipedOutputStream();
+        inputStream = new PipedInputStream(outputStream);
+        dataProvider = new DataProvider(configuration, partition, outputStream);
+        started = false;
+    }
+
+    @Override
+    public boolean stop() throws IOException {
+        dataProvider.stop();
+        return true;
+    }
+
+    public synchronized void start() {
+        if (!started) {
+            executorService.execute(dataProvider);
+            started = true;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!started) {
+            start();
+        }
+        return inputStream.read();
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if (!started) {
+            start();
+        }
+        return inputStream.read(b, off, len);
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    private static class DataProvider implements Runnable {
+
+        public static final String KEY_MODE = "mode";
+
+        private final TweetGenerator tweetGenerator;
+        private boolean continuePush = true;
+        private int batchSize;
+        private final Mode mode;
+        private final OutputStream os;
+
+        public static enum Mode {
+            AGGRESSIVE,
+            CONTROLLED
+        }
+
+        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
+            this.tweetGenerator = new TweetGenerator(configuration, partition);
+            this.tweetGenerator.registerSubscriber(os);
+            this.os = os;
+            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+                    : Mode.AGGRESSIVE;
+            switch (mode) {
+                case CONTROLLED:
+                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+                    if (tpsValue == null) {
+                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+                    }
+                    batchSize = Integer.parseInt(tpsValue);
+                    break;
+                case AGGRESSIVE:
+                    batchSize = 5000;
+                    break;
+            }
+        }
+
+        @Override
+        public void run() {
+            boolean moreData = true;
+            long startBatch;
+            long endBatch;
+            while (true) {
+                try {
+                    while (moreData && continuePush) {
+                        switch (mode) {
+                            case AGGRESSIVE:
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                break;
+                            case CONTROLLED:
+                                startBatch = System.currentTimeMillis();
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                endBatch = System.currentTimeMillis();
+                                if ((endBatch - startBatch) < 1000) {
+                                    Thread.sleep(1000 - (endBatch - startBatch));
+                                }
+                                break;
+                        }
+                    }
+                    os.close();
+                    break;
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Exception in adapter " + e.getMessage());
+                    }
+                }
+            }
+        }
+
+        public void stop() {
+            continuePush = false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
new file mode 100644
index 0000000..85d0e41
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+import org.apache.asterix.external.input.stream.LocalFSInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
+    protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamFactory.class.getName());
+    protected static INodeResolver nodeResolver;
+    protected Map<String, String> configuration;
+    protected FileSplit[] inputFileSplits;
+    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+    protected boolean isFeed;
+    protected String expression;
+    // transient fields (They don't need to be serialized and transferred)
+    private transient AlgebricksAbsolutePartitionConstraint constraints;
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        try {
+            return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        this.configuration = configuration;
+        String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
+        configureFileSplits(splits);
+        configurePartitionConstraint();
+        this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
+        if (isFeed) {
+            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
+                    ExternalDataUtils.getFeedName(configuration), constraints);
+        }
+        this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return constraints;
+    }
+
+    private void configureFileSplits(String[] splits) throws AsterixException {
+        if (inputFileSplits == null) {
+            inputFileSplits = new FileSplit[splits.length];
+            String nodeName;
+            String nodeLocalPath;
+            int count = 0;
+            String trimmedValue;
+            for (String splitPath : splits) {
+                trimmedValue = splitPath.trim();
+                if (!trimmedValue.contains("://")) {
+                    throw new AsterixException(
+                            "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
+                }
+                nodeName = trimmedValue.split(":")[0];
+                nodeLocalPath = trimmedValue.split("://")[1];
+                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+                inputFileSplits[count++] = fileSplit;
+            }
+        }
+    }
+
+    private void configurePartitionConstraint() throws AsterixException {
+        String[] locs = new String[inputFileSplits.length];
+        String location;
+        for (int i = 0; i < inputFileSplits.length; i++) {
+            location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
+            locs[i] = location;
+        }
+        constraints = new AlgebricksAbsolutePartitionConstraint(locs);
+    }
+
+    protected INodeResolver getNodeResolver() {
+        if (nodeResolver == null) {
+            synchronized (DEFAULT_NODE_RESOLVER) {
+                if (nodeResolver == null) {
+                    nodeResolver = initializeNodeResolver();
+                }
+            }
+        }
+        return nodeResolver;
+    }
+
+    private static INodeResolver initializeNodeResolver() {
+        INodeResolver nodeResolver = null;
+        String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
+        if (configuredNodeResolverFactory != null) {
+            try {
+                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
+                        .createNodeResolver();
+
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
+                            + configuredNodeResolverFactory + "\n" + e.getMessage());
+                }
+                nodeResolver = DEFAULT_NODE_RESOLVER;
+            }
+        } else {
+            nodeResolver = DEFAULT_NODE_RESOLVER;
+        }
+        return nodeResolver;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
deleted file mode 100644
index 54ee780..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.File;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.external.api.INodeResolverFactory;
-import org.apache.asterix.external.input.stream.provider.LocalFSInputStreamProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.NodeResolverFactory;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
-    protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamProviderFactory.class.getName());
-    protected static INodeResolver nodeResolver;
-    protected Map<String, String> configuration;
-    protected FileSplit[] inputFileSplits;
-    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
-    protected boolean isFeed;
-    protected String expression;
-    // transient fields (They don't need to be serialized and transferred)
-    private transient AlgebricksAbsolutePartitionConstraint constraints;
-
-    @Override
-    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) {
-        return new LocalFSInputStreamProvider(inputFileSplits, ctx, configuration, partition, expression, isFeed);
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.STREAM;
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws AsterixException {
-        this.configuration = configuration;
-        String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
-        configureFileSplits(splits);
-        configurePartitionConstraint();
-        this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
-        if (isFeed) {
-            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
-                    ExternalDataUtils.getFeedName(configuration), constraints);
-        }
-        this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
-    }
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return constraints;
-    }
-
-    private void configureFileSplits(String[] splits) throws AsterixException {
-        if (inputFileSplits == null) {
-            inputFileSplits = new FileSplit[splits.length];
-            String nodeName;
-            String nodeLocalPath;
-            int count = 0;
-            String trimmedValue;
-            for (String splitPath : splits) {
-                trimmedValue = splitPath.trim();
-                if (!trimmedValue.contains("://")) {
-                    throw new AsterixException(
-                            "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
-                }
-                nodeName = trimmedValue.split(":")[0];
-                nodeLocalPath = trimmedValue.split("://")[1];
-                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
-                inputFileSplits[count++] = fileSplit;
-            }
-        }
-    }
-
-    private void configurePartitionConstraint() throws AsterixException {
-        String[] locs = new String[inputFileSplits.length];
-        String location;
-        for (int i = 0; i < inputFileSplits.length; i++) {
-            location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
-            locs[i] = location;
-        }
-        constraints = new AlgebricksAbsolutePartitionConstraint(locs);
-    }
-
-    protected INodeResolver getNodeResolver() {
-        if (nodeResolver == null) {
-            synchronized (DEFAULT_NODE_RESOLVER) {
-                if (nodeResolver == null) {
-                    nodeResolver = initializeNodeResolver();
-                }
-            }
-        }
-        return nodeResolver;
-    }
-
-    private static INodeResolver initializeNodeResolver() {
-        INodeResolver nodeResolver = null;
-        String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
-        if (configuredNodeResolverFactory != null) {
-            try {
-                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
-                        .createNodeResolver();
-
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
-                            + configuredNodeResolverFactory + "\n" + e.getMessage());
-                }
-                nodeResolver = DEFAULT_NODE_RESOLVER;
-            }
-        } else {
-            nodeResolver = DEFAULT_NODE_RESOLVER;
-        }
-        return nodeResolver;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
new file mode 100644
index 0000000..1eb760e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.stream.SocketClientInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketClientInputStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private List<Pair<String, Integer>> sockets;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, sockets.size());
+        return clusterLocations;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        try {
+            this.sockets = new ArrayList<Pair<String, Integer>>();
+            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+            if (socketsValue == null) {
+                throw new IllegalArgumentException(
+                        "\'sockets\' parameter not specified as part of adapter configuration");
+            }
+            String[] socketsArray = socketsValue.split(",");
+            for (String socket : socketsArray) {
+                String[] socketTokens = socket.split(":");
+                String host = socketTokens[0].trim();
+                int port = Integer.parseInt(socketTokens[1].trim());
+                InetAddress[] resolved;
+                resolved = SystemDefaultDnsResolver.INSTANCE.resolve(host);
+                Pair<String, Integer> p = new Pair<String, Integer>(resolved[0].getHostAddress(), port);
+                sockets.add(p);
+            }
+        } catch (UnknownHostException e) {
+            throw new AsterixException(e);
+        }
+    }
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        try {
+            return new SocketClientInputStream(sockets.get(partition));
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
deleted file mode 100644
index 5e84123..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IExternalDataSourceFactory;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.SocketClientInputStreamProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.http.impl.conn.SystemDefaultDnsResolver;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SocketClientInputStreamProviderFactory implements IInputStreamProviderFactory {
-
-    private static final long serialVersionUID = 1L;
-    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
-    private List<Pair<String, Integer>> sockets;
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, sockets.size());
-        return clusterLocations;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws AsterixException {
-        try {
-            this.sockets = new ArrayList<Pair<String, Integer>>();
-            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
-            if (socketsValue == null) {
-                throw new IllegalArgumentException(
-                        "\'sockets\' parameter not specified as part of adapter configuration");
-            }
-            String[] socketsArray = socketsValue.split(",");
-            for (String socket : socketsArray) {
-                String[] socketTokens = socket.split(":");
-                String host = socketTokens[0].trim();
-                int port = Integer.parseInt(socketTokens[1].trim());
-                InetAddress[] resolved;
-                resolved = SystemDefaultDnsResolver.INSTANCE.resolve(host);
-                Pair<String, Integer> p = new Pair<String, Integer>(resolved[0].getHostAddress(), port);
-                sockets.add(p);
-            }
-        } catch (UnknownHostException e) {
-            throw new AsterixException(e);
-        }
-    }
-
-    @Override
-    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        return new SocketClientInputStreamProvider(sockets.get(partition));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
new file mode 100644
index 0000000..f63b895
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.stream.SocketServerInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketServerInputStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+    private List<Pair<String, Integer>> sockets;
+    private Mode mode = Mode.IP;
+
+    public static enum Mode {
+        NC,
+        IP
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        try {
+            sockets = new ArrayList<Pair<String, Integer>>();
+            String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
+            if (modeValue != null) {
+                mode = Mode.valueOf(modeValue.trim().toUpperCase());
+            }
+            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+            if (socketsValue == null) {
+                throw new IllegalArgumentException(
+                        "\'sockets\' parameter not specified as part of adapter configuration");
+            }
+            Map<InetAddress, Set<String>> ncMap;
+            ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+            List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+            String[] socketsArray = socketsValue.split(",");
+            Random random = new Random();
+            for (String socket : socketsArray) {
+                String[] socketTokens = socket.split(":");
+                String host = socketTokens[0].trim();
+                int port = Integer.parseInt(socketTokens[1].trim());
+                Pair<String, Integer> p = null;
+                switch (mode) {
+                    case IP:
+                        Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+                        if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+                            throw new IllegalArgumentException("Invalid host " + host
+                                    + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                    + StringUtils.join(ncMap.keySet(), ", "));
+                        }
+                        String[] ncArray = ncsOnIp.toArray(new String[] {});
+                        String nc = ncArray[random.nextInt(ncArray.length)];
+                        p = new Pair<String, Integer>(nc, port);
+                        break;
+
+                    case NC:
+                        p = new Pair<String, Integer>(host, port);
+                        if (!ncs.contains(host)) {
+                            throw new IllegalArgumentException("Invalid NC " + host
+                                    + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                    + StringUtils.join(ncs, ", "));
+
+                        }
+                        break;
+                }
+                sockets.add(p);
+            }
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+    }
+
+    @Override
+    public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            Pair<String, Integer> socket = sockets.get(partition);
+            ServerSocket server;
+            server = new ServerSocket(socket.second);
+            return new SocketServerInputStream(server);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        List<String> locations = new ArrayList<String>();
+        for (Pair<String, Integer> socket : sockets) {
+            locations.add(socket.first);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+    }
+
+    public List<Pair<String, Integer>> getSockets() {
+        return sockets;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+}



Mime
View raw message