asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [09/19] incubator-asterixdb git commit: Support Change Feeds and Ingestion of Records with MetaData
Date Tue, 15 Mar 2016 23:36:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index d61dc5c..d55ac87 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -19,34 +19,50 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
-import java.util.Map;
 
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReader extends AbstractStreamRecordReader {
 
     protected boolean prevCharCR;
     protected int newlineLength;
     protected int recordNumber = 0;
-    private boolean configured = false;
+
+    public LineRecordReader(final boolean hasHeader, final AInputStream stream, final IExternalIndexer indexer)
+            throws HyracksDataException {
+        super(stream, indexer);
+        try {
+            if (hasHeader) {
+                if (hasNext()) {
+                    next();
+                }
+            }
+        } catch (final IOException e) {
+            throw new HyracksDataException(e);
+        }
+
+    }
 
     @Override
     public boolean hasNext() throws IOException {
         if (done) {
             return false;
         }
-        /* We're reading data from in, but the head of the stream may be
+        /*
+         * We're reading data from in, but the head of the stream may be
          * already buffered in buffer, so we have several cases:
          * 1. No newline characters are in the buffer, so we need to copy
-         *    everything and read another buffer from the stream.
+         *   everything and read another buffer from the stream.
          * 2. An unambiguously terminated line is in buffer, so we just
          *    copy to record.
          * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-         *    in CR.  In this case we copy everything up to CR to record, but
-         *    we also need to see what follows CR: if it's LF, then we
-         *    need consume LF as well, so next call to readLine will read
-         *    from after that.
+         *    in CR. In this case we copy everything up to CR to record, but
+         * we also need to see what follows CR: if it's LF, then we
+         * need consume LF as well, so next call to readLine will read
+         * from after that.
          * We use a flag prevCharCR to signal if previous character was CR
          * and, if it happens to be at the end of the buffer, delay
          * consuming it until we have a chance to look at the char that
@@ -95,17 +111,4 @@ public class LineRecordReader extends AbstractStreamRecordReader {
         recordNumber++;
         return true;
     }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        if (!configured) {
-            super.configure(configuration);
-            if (ExternalDataUtils.hasHeader(configuration)) {
-                if (hasNext()) {
-                    next();
-                }
-            }
-        }
-        configured = true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
index f0867d3..68f10f6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -18,26 +18,30 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
-import java.util.Map;
-
+import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
         String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
-        LineRecordReader recordReader;
+        boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
+        Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
         if (quoteString != null) {
-            recordReader = new QuotedLineRecordReader();
+            return new QuotedLineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second, quoteString);
         } else {
-            recordReader = new LineRecordReader();
+            return new LineRecordReader(hasHeader, streamAndIndexer.first, streamAndIndexer.second);
         }
-        return configureReader(recordReader, ctx, partition);
     }
 
     @Override
@@ -45,8 +49,4 @@ public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<c
         return char[].class;
     }
 
-    @Override
-    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index a8eb07b..6266aa2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -19,24 +19,24 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
-import java.util.Map;
 
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class QuotedLineRecordReader extends LineRecordReader {
 
-    private char quote;
+    private final char quote;
     private boolean prevCharEscape;
     private boolean inQuote;
 
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        super.configure(configuration);
-        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
-        if (quoteString == null || quoteString.length() != 1) {
-            throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
+    public QuotedLineRecordReader(final boolean hasHeader, final AInputStream stream, final IExternalIndexer indexer,
+            final String quoteString) throws HyracksDataException {
+        super(hasHeader, stream, indexer);
+        if ((quoteString == null) || (quoteString.length() != 1)) {
+            throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
                     ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
         }
         this.quote = quoteString.charAt(0);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index f41486e..678dd03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -19,9 +19,10 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 
@@ -34,15 +35,10 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
     private char recordEnd;
     private int recordNumber = 0;
 
-    public int getRecordNumber() {
-        return recordNumber;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        super.configure(configuration);
-        String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
-        String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
+    public SemiStructuredRecordReader(AInputStream stream, IExternalIndexer indexer, String recStartString,
+            String recEndString) throws AsterixException {
+        super(stream, indexer);
+        // set record opening char
         if (recStartString != null) {
             if (recStartString.length() != 1) {
                 throw new AsterixException(
@@ -53,6 +49,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
         } else {
             recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
         }
+        // set record ending char
         if (recEndString != null) {
             if (recEndString.length() != 1) {
                 throw new AsterixException(
@@ -65,6 +62,10 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
         }
     }
 
+    public int getRecordNumber() {
+        return recordNumber;
+    }
+
     @Override
     public boolean hasNext() throws Exception {
         if (done) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
index ec8eac9..206ae50 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -18,27 +18,34 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
-import java.util.Map;
-
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
-        SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
-        return configureReader(recordReader, ctx, partition);
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
+        try {
+            return new SemiStructuredRecordReader(streamAndIndexer.first, streamAndIndexer.second,
+                    configuration.get(ExternalDataConstants.KEY_RECORD_START),
+                    configuration.get(ExternalDataConstants.KEY_RECORD_END));
+        } catch (AsterixException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override
     public Class<? extends char[]> getRecordClass() {
         return char[].class;
     }
-
-    @Override
-    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 617bc39..be9ce06 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -20,15 +20,12 @@ package org.apache.asterix.external.input.record.reader.twitter;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 import twitter4j.Query;
@@ -39,7 +36,6 @@ import twitter4j.TwitterException;
 
 public class TwitterPullRecordReader implements IRecordReader<Status> {
 
-    private String keywords;
     private Query query;
     private Twitter twitter;
     private int requestInterval = 5; // seconds
@@ -48,7 +44,12 @@ public class TwitterPullRecordReader implements IRecordReader<Status> {
     private long lastTweetIdReceived = 0;
     private GenericRecord<Status> record;
 
-    public TwitterPullRecordReader() {
+    public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) {
+        this.twitter = twitter;
+        this.requestInterval = requestInterval;
+        this.query = new Query(keywords);
+        this.query.setCount(100);
+        this.record = new GenericRecord<Status>();
     }
 
     @Override
@@ -56,16 +57,6 @@ public class TwitterPullRecordReader implements IRecordReader<Status> {
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        twitter = TwitterUtil.getTwitterService(configuration);
-        keywords = configuration.get(SearchAPIConstants.QUERY);
-        requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
-        query = new Query(keywords);
-        query.setCount(100);
-        record = new GenericRecord<Status>();
-    }
-
-    @Override
     public boolean hasNext() throws Exception {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 19f156c..64695b5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.input.record.reader.twitter;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.asterix.external.api.IDataFlowController;
@@ -27,7 +26,6 @@ import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.TwitterUtil;
 
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
@@ -42,6 +40,22 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
     private GenericRecord<Status> record;
     private boolean closed = false;
 
+    public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
+        record = new GenericRecord<Status>();
+        inputQ = new LinkedBlockingQueue<Status>();
+        this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
+        this.twitterStream.addListener(new TweetListener(inputQ));
+        this.twitterStream.filter(query);
+    }
+
+    public TwitterPushRecordReader(TwitterStream twitterStream) {
+        record = new GenericRecord<Status>();
+        inputQ = new LinkedBlockingQueue<Status>();
+        this.twitterStream = twitterStream;//
+        this.twitterStream.addListener(new TweetListener(inputQ));
+        twitterStream.sample();
+    }
+
     @Override
     public void close() throws IOException {
         if (!closed) {
@@ -53,20 +67,6 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        record = new GenericRecord<Status>();
-        inputQ = new LinkedBlockingQueue<Status>();
-        twitterStream = TwitterUtil.getTwitterStream(configuration);
-        twitterStream.addListener(new TweetListener(inputQ));
-        FilterQuery query = TwitterUtil.getFilterQuery(configuration);
-        if (query != null) {
-            twitterStream.filter(query);
-        } else {
-            twitterStream.sample();
-        }
-    }
-
-    @Override
     public boolean hasNext() throws Exception {
         return !closed;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index a2a4742..7ca185f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -33,7 +33,9 @@ import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
 import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import twitter4j.FilterQuery;
 import twitter4j.Status;
 
 public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
@@ -54,13 +56,13 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
         return clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws AsterixException {
         this.configuration = configuration;
         TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
         if (!validateConfiguration(configuration)) {
@@ -70,7 +72,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
             builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
-            throw new Exception(builder.toString());
+            throw new AsterixException(builder.toString());
         }
         if (ExternalDataUtils.isPull(configuration)) {
             pull = true;
@@ -107,15 +109,22 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
     }
 
     @Override
-    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
-        IRecordReader<Status> reader;
+    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
         if (pull) {
-            reader = new TwitterPullRecordReader();
+            return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+                    configuration.get(SearchAPIConstants.QUERY),
+                    Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
         } else {
-            reader = new TwitterPushRecordReader();
+            FilterQuery query;
+            try {
+                query = TwitterUtil.getFilterQuery(configuration);
+                return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
+                        : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
+            } catch (AsterixException e) {
+                throw new HyracksDataException(e);
+            }
         }
-        reader.configure(configuration);
-        return reader;
     }
 
     @Override
@@ -128,7 +137,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
         String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
         String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
         String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
-        if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
+        if ((consumerKey == null) || (consumerSecret == null) || (accessToken == null) || (tokenSecret == null)) {
             return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
index 469e866..b78f96d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.external.input.stream;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.util.Map;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
@@ -30,8 +28,6 @@ public abstract class AInputStream extends InputStream {
 
     public abstract boolean stop() throws Exception;
 
-    public abstract void configure(Map<String, String> configuration) throws IOException;
-
     // TODO: Find a better way to send notifications
     public abstract void setController(AbstractFeedDataFlowController controller);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index 89008aa..bf85330 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.external.input.stream;
 
 import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.Reader;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.CharsetDecoder;
@@ -29,7 +29,7 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 
-public class AInputStreamReader extends InputStreamReader {
+public class AInputStreamReader extends Reader {
     private AInputStream in;
     private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
     private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
@@ -38,7 +38,6 @@ public class AInputStreamReader extends InputStreamReader {
     private boolean done = false;
 
     public AInputStreamReader(AInputStream in) {
-        super(in);
         this.in = in;
         this.decoder = StandardCharsets.UTF_8.newDecoder();
         this.byteBuffer.flip();
@@ -74,22 +73,42 @@ public class AInputStreamReader extends InputStreamReader {
         if (done) {
             return -1;
         }
+        int len = 0;
         charBuffer.clear();
-        if (byteBuffer.hasRemaining()) {
+        while (charBuffer.position() == 0) {
+            if (byteBuffer.hasRemaining()) {
+                decoder.decode(byteBuffer, charBuffer, false);
+                System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+                if (charBuffer.position() > 0) {
+                    return charBuffer.position();
+                } else {
+                    // need to read more data
+                    System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
+                    byteBuffer.position(byteBuffer.remaining());
+                    while (len == 0) {
+                        len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
+                    }
+                }
+            } else {
+                byteBuffer.clear();
+                while (len == 0) {
+                    len = in.read(bytes, 0, bytes.length);
+                }
+            }
+            if (len == -1) {
+                done = true;
+                return len;
+            }
+            byteBuffer.position(len);
+            byteBuffer.flip();
             decoder.decode(byteBuffer, charBuffer, false);
             System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
-            return charBuffer.position();
         }
-        int len = in.read(bytes, 0, bytes.length);
-        if (len == -1) {
-            done = true;
-            return len;
-        }
-        byteBuffer.clear();
-        byteBuffer.position(len);
-        byteBuffer.flip();
-        decoder.decode(byteBuffer, charBuffer, false);
-        System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
         return charBuffer.position();
     }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
index 5b654eb..176f5f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
@@ -20,7 +20,6 @@ package org.apache.asterix.external.input.stream;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Map;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
@@ -89,10 +88,6 @@ public class BasicInputStream extends AInputStream {
     }
 
     @Override
-    public void configure(Map<String, String> configuration) {
-    }
-
-    @Override
     public void setFeedLogManager(FeedLogManager logManager) {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
index 8dcd5b6..dc6a130 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -21,7 +21,6 @@ package org.apache.asterix.external.input.stream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Map;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -34,8 +33,10 @@ public class LocalFileSystemInputStream extends AInputStream {
     private FileInputStream in;
     private byte lastByte;
 
-    public LocalFileSystemInputStream(Path inputResource, String expression, boolean isFeed) throws IOException {
+    public LocalFileSystemInputStream(Path inputResource, String expression, boolean isFeed)
+            throws HyracksDataException {
         this.watcher = new FileSystemWatcher(inputResource, expression, isFeed);
+        watcher.init();
     }
 
     @Override
@@ -105,10 +106,10 @@ public class LocalFileSystemInputStream extends AInputStream {
             }
         }
         int result = in.read(b, off, len);
-        while (result < 0 && advance()) {
+        while ((result < 0) && advance()) {
             // return a new line at the end of every file <--Might create problems for some cases
             // depending on the parser implementation-->
-            if (lastByte != ExternalDataConstants.BYTE_LF && lastByte != ExternalDataConstants.BYTE_LF) {
+            if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
                 lastByte = ExternalDataConstants.BYTE_LF;
                 b[off] = ExternalDataConstants.BYTE_LF;
                 return 1;
@@ -117,7 +118,7 @@ public class LocalFileSystemInputStream extends AInputStream {
             result = in.read(b, off, len);
         }
         if (result > 0) {
-            lastByte = b[off + result - 1];
+            lastByte = b[(off + result) - 1];
         }
         return result;
     }
@@ -133,9 +134,4 @@ public class LocalFileSystemInputStream extends AInputStream {
         watcher.close();
         return true;
     }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws IOException {
-        watcher.init();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
deleted file mode 100644
index 67c4493..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SocketInputStream extends AInputStream {
-    private ServerSocket server;
-    private Socket socket;
-    private InputStream connectionStream;
-    private AbstractFeedDataFlowController controller;
-
-    public SocketInputStream(ServerSocket server) throws IOException {
-        this.server = server;
-        socket = new Socket();
-        connectionStream = new InputStream() {
-            @Override
-            public int read() throws IOException {
-                return -1;
-            }
-        };
-    }
-
-    @Override
-    public int read() throws IOException {
-        int read = connectionStream.read();
-        while (read < 0) {
-            accept();
-            read = connectionStream.read();
-        }
-        return read;
-    }
-
-    @Override
-    public boolean skipError() throws Exception {
-        accept();
-        return true;
-    }
-
-    @Override
-    public int read(byte b[]) throws IOException {
-        return read(b, 0, b.length);
-    }
-
-    @Override
-    public int read(byte b[], int off, int len) throws IOException {
-        if (server == null) {
-            return -1;
-        }
-        int read = -1;
-        try {
-            if (connectionStream.available() < 1) {
-                controller.flush();
-            }
-            read = connectionStream.read(b, off, len);
-        } catch (IOException e) {
-            e.printStackTrace();
-            read = -1;
-        }
-        while (read < 0) {
-            if (!accept()) {
-                return -1;
-            }
-            try {
-                read = connectionStream.read(b, off, len);
-            } catch (IOException e) {
-                e.printStackTrace();
-                read = -1;
-            }
-        }
-        return read;
-    }
-
-    @Override
-    public long skip(long n) throws IOException {
-        return 0;
-    }
-
-    @Override
-    public int available() throws IOException {
-        return 1;
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-        HyracksDataException hde = null;
-        try {
-            if (connectionStream != null) {
-                connectionStream.close();
-            }
-            connectionStream = null;
-        } catch (IOException e) {
-            hde = new HyracksDataException(e);
-        }
-        try {
-            if (socket != null) {
-                socket.close();
-            }
-            socket = null;
-        } catch (IOException e) {
-            hde = ExternalDataExceptionUtils.suppress(hde, e);
-        }
-        try {
-            if (server != null) {
-                server.close();
-            }
-        } catch (IOException e) {
-            hde = ExternalDataExceptionUtils.suppress(hde, e);
-        } finally {
-            server = null;
-        }
-        if (hde != null) {
-            throw hde;
-        }
-    }
-
-    private boolean accept() throws IOException {
-        try {
-            connectionStream.close();
-            connectionStream = null;
-            socket.close();
-            socket = null;
-            socket = server.accept();
-            connectionStream = socket.getInputStream();
-            return true;
-        } catch (Exception e) {
-            close();
-            return false;
-        }
-    }
-
-    @Override
-    public boolean stop() throws Exception {
-        close();
-        return true;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) {
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager logManager) {
-    }
-
-    @Override
-    public void setController(AbstractFeedDataFlowController controller) {
-        this.controller = controller;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
new file mode 100644
index 0000000..1c33709
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketServerInputStream extends AInputStream {
+    private ServerSocket server;
+    private Socket socket;
+    private InputStream connectionStream;
+    private AbstractFeedDataFlowController controller;
+
+    public SocketServerInputStream(ServerSocket server) {
+        this.server = server;
+        socket = new Socket();
+        connectionStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return -1;
+            }
+        };
+    }
+
+    @Override
+    public int read() throws IOException {
+        int read = connectionStream.read();
+        while (read < 0) {
+            accept();
+            read = connectionStream.read();
+        }
+        return read;
+    }
+
+    @Override
+    public boolean skipError() throws Exception {
+        accept();
+        return true;
+    }
+
+    @Override
+    public int read(byte b[]) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if (server == null) {
+            return -1;
+        }
+        int read = -1;
+        try {
+            if (connectionStream.available() < 1) {
+                controller.flush();
+            }
+            read = connectionStream.read(b, off, len);
+        } catch (IOException e) {
+            e.printStackTrace();
+            read = -1;
+        }
+        while (read < 0) {
+            if (!accept()) {
+                return -1;
+            }
+            try {
+                read = connectionStream.read(b, off, len);
+            } catch (IOException e) {
+                e.printStackTrace();
+                read = -1;
+            }
+        }
+        return read;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return 1;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        HyracksDataException hde = null;
+        try {
+            if (connectionStream != null) {
+                connectionStream.close();
+            }
+            connectionStream = null;
+        } catch (IOException e) {
+            hde = new HyracksDataException(e);
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+            socket = null;
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        }
+        try {
+            if (server != null) {
+                server.close();
+            }
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        } finally {
+            server = null;
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    private boolean accept() throws IOException {
+        try {
+            connectionStream.close();
+            connectionStream = null;
+            socket.close();
+            socket = null;
+            socket = server.accept();
+            connectionStream = socket.getInputStream();
+            return true;
+        } catch (Exception e) {
+            close();
+            return false;
+        }
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        close();
+        return true;
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager logManager) {
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        this.controller = controller;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 5c1583e..54ee780 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -47,15 +47,14 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
     protected static INodeResolver nodeResolver;
     protected Map<String, String> configuration;
     protected FileSplit[] inputFileSplits;
-    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log
-                                             // storage
+    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
     protected boolean isFeed;
     protected String expression;
     // transient fields (They don't need to be serialized and transferred)
     private transient AlgebricksAbsolutePartitionConstraint constraints;
 
     @Override
-    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) {
         return new LocalFSInputStreamProvider(inputFileSplits, ctx, configuration, partition, expression, isFeed);
     }
 
@@ -70,7 +69,7 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws AsterixException {
         this.configuration = configuration;
         String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
         configureFileSplits(splits);
@@ -84,7 +83,7 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         return constraints;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
new file mode 100644
index 0000000..5e84123
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamProviderFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.provider.SocketClientInputStreamProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketClientInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+    private static final long serialVersionUID = 1L;
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private List<Pair<String, Integer>> sockets;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, sockets.size());
+        return clusterLocations;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        try {
+            this.sockets = new ArrayList<Pair<String, Integer>>();
+            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+            if (socketsValue == null) {
+                throw new IllegalArgumentException(
+                        "\'sockets\' parameter not specified as part of adapter configuration");
+            }
+            String[] socketsArray = socketsValue.split(",");
+            for (String socket : socketsArray) {
+                String[] socketTokens = socket.split(":");
+                String host = socketTokens[0].trim();
+                int port = Integer.parseInt(socketTokens[1].trim());
+                InetAddress[] resolved;
+                resolved = SystemDefaultDnsResolver.INSTANCE.resolve(host);
+                Pair<String, Integer> p = new Pair<String, Integer>(resolved[0].getHostAddress(), port);
+                sockets.add(p);
+            }
+        } catch (UnknownHostException e) {
+            throw new AsterixException(e);
+        }
+    }
+
+    @Override
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        return new SocketClientInputStreamProvider(sockets.get(partition));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
deleted file mode 100644
index 6fdc42d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.SocketInputStreamProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SocketInputStreamProviderFactory implements IInputStreamProviderFactory {
-
-    private static final long serialVersionUID = 1L;
-    private List<Pair<String, Integer>> sockets;
-    private Mode mode = Mode.IP;
-
-    public static enum Mode {
-        NC,
-        IP
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        sockets = new ArrayList<Pair<String, Integer>>();
-        String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
-        if (modeValue != null) {
-            mode = Mode.valueOf(modeValue.trim().toUpperCase());
-        }
-        String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
-        if (socketsValue == null) {
-            throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adapter configuration");
-        }
-        Map<InetAddress, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
-        List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
-        String[] socketsArray = socketsValue.split(",");
-        Random random = new Random();
-        for (String socket : socketsArray) {
-            String[] socketTokens = socket.split(":");
-            String host = socketTokens[0].trim();
-            int port = Integer.parseInt(socketTokens[1].trim());
-            Pair<String, Integer> p = null;
-            switch (mode) {
-                case IP:
-                    Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
-                    if (ncsOnIp == null || ncsOnIp.isEmpty()) {
-                        throw new IllegalArgumentException("Invalid host " + host
-                                + " as it is not part of the AsterixDB cluster. Valid choices are "
-                                + StringUtils.join(ncMap.keySet(), ", "));
-                    }
-                    String[] ncArray = ncsOnIp.toArray(new String[] {});
-                    String nc = ncArray[random.nextInt(ncArray.length)];
-                    p = new Pair<String, Integer>(nc, port);
-                    break;
-
-                case NC:
-                    p = new Pair<String, Integer>(host, port);
-                    if (!ncs.contains(host)) {
-                        throw new IllegalArgumentException(
-                                "Invalid NC " + host + " as it is not part of the AsterixDB cluster. Valid choices are "
-                                        + StringUtils.join(ncs, ", "));
-
-                    }
-                    break;
-            }
-            sockets.add(p);
-        }
-    }
-
-    @Override
-    public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
-            throws IOException, AsterixException {
-        Pair<String, Integer> socket = sockets.get(partition);
-        ServerSocket server = new ServerSocket(socket.second);
-        return new SocketInputStreamProvider(server);
-    }
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        List<String> locations = new ArrayList<String>();
-        for (Pair<String, Integer> socket : sockets) {
-            locations.add(socket.first);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-    }
-
-    public List<Pair<String, Integer>> getSockets() {
-        return sockets;
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.STREAM;
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
new file mode 100644
index 0000000..a301c1a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.provider.SocketServerInputStreamProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SocketServerInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+    private static final long serialVersionUID = 1L;
+    private List<Pair<String, Integer>> sockets;
+    private Mode mode = Mode.IP;
+
+    public static enum Mode {
+        NC,
+        IP
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        try {
+            sockets = new ArrayList<Pair<String, Integer>>();
+            String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
+            if (modeValue != null) {
+                mode = Mode.valueOf(modeValue.trim().toUpperCase());
+            }
+            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+            if (socketsValue == null) {
+                throw new IllegalArgumentException(
+                        "\'sockets\' parameter not specified as part of adapter configuration");
+            }
+            Map<InetAddress, Set<String>> ncMap;
+            ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+            List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+            String[] socketsArray = socketsValue.split(",");
+            Random random = new Random();
+            for (String socket : socketsArray) {
+                String[] socketTokens = socket.split(":");
+                String host = socketTokens[0].trim();
+                int port = Integer.parseInt(socketTokens[1].trim());
+                Pair<String, Integer> p = null;
+                switch (mode) {
+                    case IP:
+                        Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+                        if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+                            throw new IllegalArgumentException("Invalid host " + host
+                                    + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                    + StringUtils.join(ncMap.keySet(), ", "));
+                        }
+                        String[] ncArray = ncsOnIp.toArray(new String[] {});
+                        String nc = ncArray[random.nextInt(ncArray.length)];
+                        p = new Pair<String, Integer>(nc, port);
+                        break;
+
+                    case NC:
+                        p = new Pair<String, Integer>(host, port);
+                        if (!ncs.contains(host)) {
+                            throw new IllegalArgumentException("Invalid NC " + host
+                                    + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                    + StringUtils.join(ncs, ", "));
+
+                        }
+                        break;
+                }
+                sockets.add(p);
+            }
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+    }
+
+    @Override
+    public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            Pair<String, Integer> socket = sockets.get(partition);
+            ServerSocket server;
+            server = new ServerSocket(socket.second);
+            return new SocketServerInputStreamProvider(server);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        List<String> locations = new ArrayList<String>();
+        for (Pair<String, Integer> socket : sockets) {
+            locations.add(socket.first);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+    }
+
+    public List<Pair<String, Integer>> getSockets() {
+        return sockets;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
index 95378cb..7b09ade 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -28,6 +28,7 @@ import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStr
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
@@ -53,7 +54,7 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
     private Map<String, String> configuration;
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
         String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
         String[] locations = null;
@@ -80,7 +81,7 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) {
         this.configuration = configuration;
     }
 
@@ -90,7 +91,8 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
     }
 
     @Override
-    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
         return new TwitterFirehoseInputStreamProvider(configuration, ctx, partition);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
index bf9653d..e1ab331 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
@@ -38,12 +38,10 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
 
     public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
             JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
-        super(read, inputSplits, readSchedule, nodeName, conf);
+        super(read, inputSplits, readSchedule, nodeName, conf, snapshot,
+                snapshot == null ? null : ExternalIndexerProvider.getIndexer(configuration));
         value = new Text();
-        configure(configuration);
         if (snapshot != null) {
-            setSnapshot(snapshot);
-            setIndexer(ExternalIndexerProvider.getIndexer(configuration));
             if (currentSplitIndex < snapshot.size()) {
                 indexer.reset(this);
             }
@@ -51,7 +49,7 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
     }
 
     @Override
-    public AInputStream getInputStream() throws Exception {
+    public AInputStream getInputStream() {
         return new HDFSInputStream();
     }
 
@@ -119,10 +117,6 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
         }
 
         @Override
-        public void configure(Map<String, String> configuration) {
-        }
-
-        @Override
         public void setFeedLogManager(FeedLogManager logManager) {
         }
 
@@ -130,8 +124,4 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
         public void setController(AbstractFeedDataFlowController controller) {
         }
     }
-
-    @Override
-    public void configure(Map<String, String> configuration) {
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
index 77520d4..fbe6035 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.input.stream.provider;
 
-import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Map;
 
@@ -27,38 +26,33 @@ import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class LocalFSInputStreamProvider implements IInputStreamProvider {
 
-    private String expression;
-    private boolean isFeed;
-    private Path path;
+    private final String expression;
+    private final boolean isFeed;
+    private final Path path;
     private FeedLogManager feedLogManager;
-    private Map<String, String> configuration;
 
-    public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
-            Map<String, String> configuration, int partition, String expression, boolean isFeed) {
+    public LocalFSInputStreamProvider(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
+            final Map<String, String> configuration, final int partition, final String expression,
+            final boolean isFeed) {
         this.expression = expression;
         this.isFeed = isFeed;
         this.path = fileSplits[partition].getLocalFile().getFile().toPath();
     }
 
     @Override
-    public AInputStream getInputStream() throws IOException {
-        LocalFileSystemInputStream stream = new LocalFileSystemInputStream(path, expression, isFeed);
+    public AInputStream getInputStream() throws HyracksDataException {
+        final LocalFileSystemInputStream stream = new LocalFileSystemInputStream(path, expression, isFeed);
         stream.setFeedLogManager(feedLogManager);
-        stream.configure(configuration);
         return stream;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) {
-        this.configuration = configuration;
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    public void setFeedLogManager(final FeedLogManager feedLogManager) {
         this.feedLogManager = feedLogManager;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
new file mode 100644
index 0000000..f842638
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+public class SocketClientInputStreamProvider implements IInputStreamProvider {
+
+    private static final Logger LOGGER = Logger.getLogger(SocketClientInputStreamProvider.class.getName());
+    private final Socket socket;
+
+    public SocketClientInputStreamProvider(Pair<String, Integer> ipAndPort) throws HyracksDataException {
+        try {
+            socket = new Socket(ipAndPort.first, ipAndPort.second);
+        } catch (IOException e) {
+            LOGGER.error(
+                    "Problem in creating socket against host " + ipAndPort.first + " on the port " + ipAndPort.second,
+                    e);
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public AInputStream getInputStream() throws HyracksDataException {
+        InputStream in;
+        try {
+            in = socket.getInputStream();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return new AInputStream() {
+            @Override
+            public int read() throws IOException {
+                throw new IOException("method not supported. use read(byte[] buffer, int offset, int length) instead");
+            }
+
+            @Override
+            public int read(byte[] buffer, int offset, int length) throws IOException {
+                return in.read(buffer, offset, length);
+            }
+
+            @Override
+            public boolean stop() throws Exception {
+                if (!socket.isClosed()) {
+                    try {
+                        in.close();
+                    } finally {
+                        socket.close();
+                    }
+                }
+                return true;
+            }
+
+            @Override
+            public boolean skipError() throws Exception {
+                return false;
+            }
+
+            @Override
+            public void setFeedLogManager(FeedLogManager logManager) {
+            }
+
+            @Override
+            public void setController(AbstractFeedDataFlowController controller) {
+            }
+        };
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
deleted file mode 100644
index b6da314..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.net.ServerSocket;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.SocketInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-
-public class SocketInputStreamProvider implements IInputStreamProvider {
-    private ServerSocket server;
-
-    public SocketInputStreamProvider(ServerSocket server) {
-        this.server = server;
-    }
-
-    @Override
-    public AInputStream getInputStream() throws Exception {
-        return new SocketInputStream(server);
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) {
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
new file mode 100644
index 0000000..64f0342
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.provider;
+
+import java.net.ServerSocket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.SocketServerInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
+
+public class SocketServerInputStreamProvider implements IInputStreamProvider {
+    private final ServerSocket server;
+
+    public SocketServerInputStreamProvider(ServerSocket server) {
+        this.server = server;
+    }
+
+    @Override
+    public AInputStream getInputStream() {
+        return new SocketServerInputStream(server);
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index cd4a3c1..a979262 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -35,40 +35,45 @@ import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.TweetGenerator;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
 
     private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
 
-    private ExecutorService executorService;
+    private final ExecutorService executorService;
 
-    private PipedOutputStream outputStream;
+    private final PipedOutputStream outputStream;
 
-    private PipedInputStream inputStream;
+    private final PipedInputStream inputStream;
 
-    private TwitterServer twitterServer;
+    private final TwitterServer twitterServer;
 
     public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
-            throws Exception {
-        executorService = Executors.newCachedThreadPool();
-        outputStream = new PipedOutputStream();
-        inputStream = new PipedInputStream(outputStream);
-        twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+            throws HyracksDataException {
+        try {
+            executorService = Executors.newCachedThreadPool();
+            outputStream = new PipedOutputStream();
+            inputStream = new PipedInputStream(outputStream);
+            twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override
-    public AInputStream getInputStream() throws Exception {
+    public AInputStream getInputStream() {
         return twitterServer;
     }
 
     private static class TwitterServer extends AInputStream {
         private final DataProvider dataProvider;
         private final ExecutorService executorService;
-        private InputStream in;
+        private final InputStream in;
         private boolean started;
 
         public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
-                ExecutorService executorService, InputStream in) throws Exception {
+                ExecutorService executorService, InputStream in) {
             dataProvider = new DataProvider(configuration, partition, os);
             this.executorService = executorService;
             this.in = in;
@@ -111,10 +116,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
         }
 
         @Override
-        public void configure(Map<String, String> configuration) {
-        }
-
-        @Override
         public void setFeedLogManager(FeedLogManager logManager) {
         }
 
@@ -127,7 +128,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
 
         public static final String KEY_MODE = "mode";
 
-        private TweetGenerator tweetGenerator;
+        private final TweetGenerator tweetGenerator;
         private boolean continuePush = true;
         private int batchSize;
         private final Mode mode;
@@ -138,7 +139,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
             CONTROLLED
         }
 
-        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
+        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
             this.tweetGenerator = new TweetGenerator(configuration, partition);
             this.tweetGenerator.registerSubscriber(os);
             this.os = os;
@@ -163,7 +164,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
             boolean moreData = true;
             long startBatch;
             long endBatch;
-
             while (true) {
                 try {
                     while (moreData && continuePush) {
@@ -175,7 +175,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
                                 startBatch = System.currentTimeMillis();
                                 moreData = tweetGenerator.generateNextBatch(batchSize);
                                 endBatch = System.currentTimeMillis();
-                                if (endBatch - startBatch < 1000) {
+                                if ((endBatch - startBatch) < 1000) {
                                     Thread.sleep(1000 - (endBatch - startBatch));
                                 }
                                 break;
@@ -194,11 +194,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
         public void stop() {
             continuePush = false;
         }
-
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) {
     }
 
     @Override



Mime
View raw message