asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] incubator-asterixdb git commit: ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection
Date Fri, 26 Feb 2016 14:10:32 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 5f7b496f9 -> e6f478d8f


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
new file mode 100644
index 0000000..7047dbc
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm
@@ -0,0 +1,10 @@
+{ "tweetid": "1", "tweetid-copy": "1", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("42.13,80.43"), "send-time": datetime("2005-12-05T21:06:41.000Z"), "send-time-copy": datetime("2005-12-05T21:06:41.000Z"), "referred-topics": {{ "samsung", "plan" }}, "message-text": " love samsung the plan is amazing" }
+{ "tweetid": "10", "tweetid-copy": "10", "user": { "screen-name": "Rolldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstful", "followers_count": 3311368i32 }, "sender-location": point("46.94,93.98"), "send-time": datetime("2011-04-07T14:08:46.000Z"), "send-time-copy": datetime("2011-04-07T14:08:46.000Z"), "referred-topics": {{ "t-mobile", "signal" }}, "message-text": " like t-mobile the signal is good" }
+{ "tweetid": "2", "tweetid-copy": "2", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "David Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("28.86,70.44"), "send-time": datetime("2007-08-15T06:44:17.000Z"), "send-time-copy": datetime("2007-08-15T06:44:17.000Z"), "referred-topics": {{ "sprint", "voice-clarity" }}, "message-text": " like sprint its voice-clarity is mind-blowing" }
+{ "tweetid": "3", "tweetid-copy": "3", "user": { "screen-name": "RollandEckhard#500", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Hetfield", "followers_count": 3311368i32 }, "sender-location": point("39.84,86.48"), "send-time": datetime("2008-12-24T00:07:04.000Z"), "send-time-copy": datetime("2008-12-24T00:07:04.000Z"), "referred-topics": {{ "verizon", "voice-command" }}, "message-text": " can't stand verizon its voice-command is terrible:(" }
+{ "tweetid": "4", "tweetid-copy": "4", "user": { "screen-name": "RollandEckhardstein#221", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstinz", "followers_count": 3311368i32 }, "sender-location": point("27.67,87.32"), "send-time": datetime("2007-02-05T16:39:13.000Z"), "send-time-copy": datetime("2007-02-05T16:39:13.000Z"), "referred-topics": {{ "t-mobile", "customer-service" }}, "message-text": " love t-mobile its customer-service is mind-blowing" }
+{ "tweetid": "5", "tweetid-copy": "5", "user": { "screen-name": "RollandEcstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardst", "followers_count": 3311368i32 }, "sender-location": point("27.3,92.77"), "send-time": datetime("2010-09-12T06:15:28.000Z"), "send-time-copy": datetime("2010-09-12T06:15:28.000Z"), "referred-topics": {{ "t-mobile", "customization" }}, "message-text": " like t-mobile the customization is amazing:)" }
+{ "tweetid": "6", "tweetid-copy": "6", "user": { "screen-name": "Rollkhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Kirk Hammette ", "followers_count": 3311368i32 }, "sender-location": point("45.62,84.78"), "send-time": datetime("2012-01-23T06:23:13.000Z"), "send-time-copy": datetime("2012-01-23T06:23:13.000Z"), "referred-topics": {{ "iphone", "network" }}, "message-text": " like iphone its network is awesome:)" }
+{ "tweetid": "7", "tweetid-copy": "7", "user": { "screen-name": "andEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland khardstein", "followers_count": 3311368i32 }, "sender-location": point("44.12,81.46"), "send-time": datetime("2012-02-17T17:30:26.000Z"), "send-time-copy": datetime("2012-02-17T17:30:26.000Z"), "referred-topics": {{ "t-mobile", "network" }}, "message-text": " hate t-mobile the network is bad" }
+{ "tweetid": "8", "tweetid-copy": "8", "user": { "screen-name": "Rolltein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Ron Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("36.86,90.71"), "send-time": datetime("2009-03-12T13:18:04.000Z"), "send-time-copy": datetime("2009-03-12T13:18:04.000Z"), "referred-topics": {{ "at&t", "touch-screen" }}, "message-text": " dislike at&t its touch-screen is OMG" }
+{ "tweetid": "9", "tweetid-copy": "9", "user": { "screen-name": "Roldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckdstein", "followers_count": 3311368i32 }, "sender-location": point("29.07,97.05"), "send-time": datetime("2012-08-15T20:19:46.000Z"), "send-time-copy": datetime("2012-08-15T20:19:46.000Z"), "referred-topics": {{ "verizon", "speed" }}, "message-text": " hate verizon its speed is bad" }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a3a1fba..96a2c37 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -36,6 +36,16 @@
           </compilation-unit>
         </test-case> -->
         <test-case FilePath="feeds">
+            <compilation-unit name="feed-push-socket">
+                <output-dir compare="Text">feed-push-socket</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="drop-dataverse-with-disconnected-feed">
+                <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
             <compilation-unit name="feed-with-external-parser">
                 <output-dir compare="Text">feed-with-external-parser</output-dir>
             </compilation-unit>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
new file mode 100644
index 0000000..765dc71
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class FileFeedSocketAdapterClient implements ITestClient {
+    private final int port;
+    private final int wait;
+    private final String url;
+    private Socket socket;
+    private String path;
+    private int batchSize;
+    private int maxCount;
+    private OutputStream out = null;
+
+    // expected args: url, source-file-path, max-count, batch-size, wait
+    public FileFeedSocketAdapterClient(int port, String[] args) throws Exception {
+        this.port = port;
+        if (args.length != 5) {
+            throw new Exception(
+                    "Invalid arguments for FileFeedSocketAdapterClient. Expected arguments <url> <source-file-path> <max-count> <batch-size> <wait>");
+        }
+        this.url = args[0];
+        this.path = args[1];
+        this.maxCount = Integer.parseInt(args[2]);
+        this.batchSize = Integer.parseInt(args[3]);
+        this.wait = Integer.parseInt(args[4]);
+    }
+
+    @Override
+    public void start() {
+        try {
+            socket = new Socket(url, port);
+        } catch (IOException e) {
+            System.err.println("Problem in creating socket against host " + url + " on the port " + port);
+            e.printStackTrace();
+        }
+
+        int recordCount = 0;
+        BufferedReader br = null;
+        try {
+            out = socket.getOutputStream();
+            br = new BufferedReader(new FileReader(path));
+            String nextRecord;
+            while ((nextRecord = br.readLine()) != null) {
+                ByteBuffer b = StandardCharsets.UTF_8.encode(nextRecord);
+                if (wait >= 1 && recordCount % batchSize == 0) {
+                    Thread.sleep(wait);
+                }
+                out.write(b.array(), 0, b.limit());
+                recordCount++;
+                if (recordCount == maxCount) {
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (out != null) {
+            try {
+                out.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+        } catch (IOException e) {
+            System.err.println("Problem in closing socket against host " + url + " on the port " + port);
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
new file mode 100644
index 0000000..56d626d
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+public interface ITestClient {
+
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
new file mode 100644
index 0000000..d26351b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.client;
+
+import java.util.Arrays;
+
+public class TestClientProvider {
+
+    public static ITestClient createTestClient(String[] args, int port) throws Exception {
+        if (args.length < 1) {
+            throw new Exception("Unspecified test client");
+        }
+        String clientName = args[0];
+        String[] clientArgs = Arrays.copyOfRange(args, 1, args.length);
+        switch (clientName) {
+            case "file-client":
+                return new FileFeedSocketAdapterClient(port, clientArgs);
+            default:
+                throw new Exception("Unknown test client: " + clientName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index f40cce4..ba32af2 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -26,10 +26,10 @@ import java.net.ServerSocket;
 import java.net.Socket;
 
 public class FileTestServer implements ITestServer {
-    private String[] paths;
-    private final int port;
-    private ServerSocket serverSocket;
-    private Thread listenerThread;
+    protected String[] paths;
+    protected final int port;
+    protected ServerSocket serverSocket;
+    protected Thread listenerThread;
 
     public FileTestServer(int port) {
         this.port = port;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
index 18a4969..b3b1183 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java
@@ -20,7 +20,7 @@ package org.apache.asterix.test.server;
 
 public interface ITestServer {
 
-    public void configure(String[] args);
+    public void configure(String[] args) throws Exception;
 
     public void start() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
new file mode 100644
index 0000000..1c2cef6
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class OpenSocketFileTestServer extends FileTestServer {
+
+    private boolean closed;
+
+    public OpenSocketFileTestServer(int port) {
+        super(port);
+    }
+
+    @Override
+    public void start() throws IOException {
+        serverSocket = new ServerSocket(port);
+        listenerThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!serverSocket.isClosed()) {
+                    try {
+                        Socket socket = serverSocket.accept();
+                        new Thread(new SocketThread(socket)).start();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        // Do nothing. This means the socket was closed for some reason.
+                        // There is nothing to do here except try to close the socket and see if the
+                        // server is still listening!
+                        // This also could be due to the close() call
+                    }
+                }
+            }
+        });
+        listenerThread.start();
+    }
+
+    private class SocketThread implements Runnable {
+        private Socket socket;
+        private OutputStream os;
+
+        public SocketThread(Socket socket) {
+            this.socket = socket;
+        }
+
+        @Override
+        public void run() {
+            try {
+                os = socket.getOutputStream();
+                byte[] chunk = new byte[1024];
+                for (String path : paths) {
+                    try (FileInputStream fin = new FileInputStream(new File(path))) {
+                        int read = fin.read(chunk);
+                        while (read > 0) {
+                            os.write(chunk, 0, read);
+                            read = fin.read(chunk);
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+                th.printStackTrace();
+                // There are two possibilities here:
+                // 1. The socket was closed from the other end.
+                // 2. Server.close() was called.
+            } finally {
+                synchronized (serverSocket) {
+                    if (!closed) {
+                        try {
+                            serverSocket.wait();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                try {
+                    os.close();
+                } catch (Throwable th) {
+                    th.printStackTrace();
+                }
+                try {
+                    socket.close();
+                } catch (Throwable th) {
+                    th.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws IOException, InterruptedException {
+        synchronized (serverSocket) {
+            closed = true;
+            try {
+                serverSocket.close();
+                if (listenerThread.isAlive()) {
+                    listenerThread.join();
+                }
+            } finally {
+                serverSocket.notifyAll();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
new file mode 100644
index 0000000..3312d1b
--- /dev/null
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.server;
+
+import org.apache.asterix.test.client.ITestClient;
+import org.apache.asterix.test.client.TestClientProvider;
+
+public class TestClientServer implements ITestServer {
+
+    // port of the server to connect to
+    private final int port;
+    private ITestClient client;
+
+    public TestClientServer(int port) {
+        this.port = port;
+    }
+
+    @Override
+    public void configure(String[] args) throws Exception {
+        client = TestClientProvider.createTestClient(args, port);
+    }
+
+    @Override
+    public void start() throws Exception {
+        client.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        client.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
index 60c1c11..0bdb74e 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java
@@ -26,6 +26,10 @@ public class TestServerProvider {
                 return new FileTestServer(port);
             case "rss":
                 return new RSSTestServer(port);
+            case "open-socket-file":
+                return new OpenSocketFileTestServer(port);
+            case "client":
+                return new TestClientServer(port);
             default:
                 throw new Exception("Unknown test server");
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d5b1c6e..851acd4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -38,7 +38,7 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
@@ -66,7 +66,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return dataSourceFactory.getPartitionConstraint();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 17916e5..3965e5e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -50,7 +50,7 @@ public interface IAdapterFactory extends Serializable {
      * In the former case, the IP address is translated to a node controller id
      * running on the node with the given IP address.
      */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
 
     /**
      * Creates an instance of IDatasourceAdapter.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 370ea93..1487cf1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -19,9 +19,12 @@
 package org.apache.asterix.external.api;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Map;
 
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 
 public interface IExternalDataSourceFactory extends Serializable {
 
@@ -45,7 +48,7 @@ public interface IExternalDataSourceFactory extends Serializable {
      * @return
      * @throws Exception
      */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
 
     /**
      * Configure the data parser factory. The passed map contains key value pairs from the
@@ -63,4 +66,32 @@ public interface IExternalDataSourceFactory extends Serializable {
         return false;
     }
 
+    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
+            AlgebricksAbsolutePartitionConstraint constraints, int count) {
+        if (constraints == null) {
+            ArrayList<String> locs = new ArrayList<String>();
+            Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+            int i = 0;
+            while (i < count) {
+                for (String node : stores.keySet()) {
+                    int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+                    for (int k = 0; k < numIODevices; k++) {
+                        locs.add(node);
+                        i++;
+                        if (i == count) {
+                            break;
+                        }
+                    }
+                    if (i == count) {
+                        break;
+                    }
+                }
+            }
+            String[] cluster = new String[locs.size()];
+            cluster = locs.toArray(cluster);
+            constraints = new AlgebricksAbsolutePartitionConstraint(cluster);
+        }
+        return constraints;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index adb2602..fdc54d6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -25,5 +25,4 @@ public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
     public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception;
 
     public Class<? extends T> getRecordClass();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 5b3828d..6e3ead2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
@@ -51,7 +50,7 @@ public class HDFSDataSourceFactory
         implements IInputStreamProviderFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
 
     protected static final long serialVersionUID = 1L;
-    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     protected String[] readSchedule;
     protected boolean read[];
     protected InputSplitsFactory inputSplitsFactory;
@@ -76,7 +75,7 @@ public class HDFSDataSourceFactory
         JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
         confFactory = new ConfFactory(conf);
         clusterLocations = getPartitionConstraint();
-        int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+        int numPartitions = clusterLocations.getLocations().length;
         // if files list was set, we restrict the splits to the list
         InputSplit[] inputSplits;
         if (files == null) {
@@ -99,7 +98,8 @@ public class HDFSDataSourceFactory
         }
     }
 
-    // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
+    // Used to tell the factory to restrict the splits to the intersection between this list and the
+    // actual files on hdfs side
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
         this.files = files;
@@ -108,7 +108,8 @@ public class HDFSDataSourceFactory
 
     /*
      * The method below was modified to take care of the following
-     * 1. when target files are not null, it generates a file aware input stream that validate against the files
+     * 1. when target files are not null, it generates a file aware input stream that validate
+     * against the files
      * 2. if the data is binary, it returns a generic reader
      */
     @Override
@@ -135,7 +136,7 @@ public class HDFSDataSourceFactory
      * @return
      */
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
         return clusterLocations;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
index b9b6f65..b715a26 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.external.input.record.RecordWithMetadata;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.couchbase.client.core.CouchbaseCore;
@@ -71,7 +71,7 @@ public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMe
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 
@@ -100,7 +100,8 @@ public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMe
     }
 
     /*
-     * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+     * We distribute the work of streaming vbuckets between all the partitions in a round robin
+     * fashion.
      */
     private void schedule() {
         schedule = new int[numOfVBuckets];

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index c302b9b..22488f7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -28,14 +28,14 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
 
     protected static final long serialVersionUID = 1L;
-    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     protected ConfFactory confFactory;
     protected Map<String, String> configuration;
 
@@ -48,7 +48,7 @@ public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
         return clusterLocations;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index bbe485c..beceea8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.sun.syndication.feed.synd.SyndEntryImpl;
@@ -36,6 +36,7 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImp
     private static final long serialVersionUID = 1L;
     private Map<String, String> configuration;
     private List<String> urls = new ArrayList<String>();
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -43,8 +44,10 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImp
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(urls.size());
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+        int count = urls.size();
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count);
+        return clusterLocations;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
index f02bd93..d02de03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public abstract class AbstractStreamRecordReaderFactory<T>
@@ -51,7 +51,7 @@ public abstract class AbstractStreamRecordReaderFactory<T>
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return inputStreamFactory.getPartitionConstraint();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 9b2d095..f41486e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -77,7 +77,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
         inString = false;
         depth = 0;
         do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
+            int startPosn = bufferPosn; // starting from where we left off the last time
             if (bufferPosn >= bufferLength) {
                 startPosn = bufferPosn = 0;
                 bufferLength = reader.read(inputBuffer);
@@ -87,7 +87,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
                 }
             }
             if (!hasStarted) {
-                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
                     if (inputBuffer[bufferPosn] == recordStart) {
                         startPosn = bufferPosn;
                         hasStarted = true;
@@ -108,7 +108,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
                 }
             }
             if (hasStarted) {
-                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
                     if (inString) {
                         // we are in a string, we only care about the string end
                         if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index f38c2cb..a2a4742 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -23,6 +23,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -30,8 +31,7 @@ import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.TwitterUtil;
 import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
 import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import twitter4j.Status;
@@ -46,6 +46,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
 
     private Map<String, String> configuration;
     private boolean pull;
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -53,8 +54,9 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
+        return clusterLocations;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index e780c95..89008aa 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -20,16 +20,28 @@ package org.apache.asterix.external.input.stream;
 
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 
 public class AInputStreamReader extends InputStreamReader {
     private AInputStream in;
+    private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private CharsetDecoder decoder;
+    private boolean done = false;
 
     public AInputStreamReader(AInputStream in) {
         super(in);
         this.in = in;
+        this.decoder = StandardCharsets.UTF_8.newDecoder();
+        this.byteBuffer.flip();
     }
 
     public boolean skipError() throws Exception {
@@ -51,4 +63,33 @@ public class AInputStreamReader extends InputStreamReader {
     public void setFeedLogManager(FeedLogManager feedLogManager) {
         in.setFeedLogManager(feedLogManager);
     }
+
+    @Override
+    public int read(char cbuf[]) throws IOException {
+        return read(cbuf, 0, cbuf.length);
+    }
+
+    @Override
+    public int read(char cbuf[], int offset, int length) throws IOException {
+        if (done) {
+            return -1;
+        }
+        charBuffer.clear();
+        if (byteBuffer.hasRemaining()) {
+            decoder.decode(byteBuffer, charBuffer, false);
+            System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+            return charBuffer.position();
+        }
+        int len = in.read(bytes, 0, bytes.length);
+        if (len == -1) {
+            done = true;
+            return len;
+        }
+        byteBuffer.clear();
+        byteBuffer.position(len);
+        byteBuffer.flip();
+        decoder.decode(byteBuffer, charBuffer, false);
+        System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position());
+        return charBuffer.position();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
index 1e86f39..cf8d339 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -25,7 +25,9 @@ import java.net.Socket;
 import java.util.Map;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SocketInputStream extends AInputStream {
     private ServerSocket server;
@@ -34,8 +36,13 @@ public class SocketInputStream extends AInputStream {
 
     public SocketInputStream(ServerSocket server) throws IOException {
         this.server = server;
-        socket = server.accept();
-        connectionStream = socket.getInputStream();
+        socket = new Socket();
+        connectionStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return -1;
+            }
+        };
     }
 
     @Override
@@ -56,20 +63,31 @@ public class SocketInputStream extends AInputStream {
 
     @Override
     public int read(byte b[]) throws IOException {
-        int read = connectionStream.read(b, 0, b.length);
-        while (read < 0) {
-            accept();
-            read = connectionStream.read(b, 0, b.length);
-        }
-        return read;
+        return read(b, 0, b.length);
     }
 
     @Override
     public int read(byte b[], int off, int len) throws IOException {
-        int read = connectionStream.read(b, off, len);
-        while (read < 0) {
-            accept();
+        if (server == null) {
+            return -1;
+        }
+        int read = -1;
+        try {
             read = connectionStream.read(b, off, len);
+        } catch (IOException e) {
+            e.printStackTrace();
+            read = -1;
+        }
+        while (read < 0) {
+            if (!accept()) {
+                return -1;
+            }
+            try {
+                read = connectionStream.read(b, off, len);
+            } catch (IOException e) {
+                e.printStackTrace();
+                read = -1;
+            }
         }
         return read;
     }
@@ -85,22 +103,57 @@ public class SocketInputStream extends AInputStream {
     }
 
     @Override
-    public void close() throws IOException {
-        connectionStream.close();
-        socket.close();
-        server.close();
+    public synchronized void close() throws IOException {
+        HyracksDataException hde = null;
+        try {
+            if (connectionStream != null) {
+                connectionStream.close();
+            }
+            connectionStream = null;
+        } catch (IOException e) {
+            hde = new HyracksDataException(e);
+        }
+        try {
+            if (socket != null) {
+                socket.close();
+            }
+            socket = null;
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        }
+        try {
+            if (server != null) {
+                server.close();
+            }
+        } catch (IOException e) {
+            hde = ExternalDataExceptionUtils.suppress(hde, e);
+        } finally {
+            server = null;
+        }
+        if (hde != null) {
+            throw hde;
+        }
     }
 
-    private void accept() throws IOException {
-        connectionStream.close();
-        socket.close();
-        socket = server.accept();
-        connectionStream = socket.getInputStream();
+    private boolean accept() throws IOException {
+        try {
+            connectionStream.close();
+            connectionStream = null;
+            socket.close();
+            socket = null;
+            socket = server.accept();
+            connectionStream = socket.getInputStream();
+            return true;
+        } catch (Exception e) {
+            close();
+            return false;
+        }
     }
 
     @Override
     public boolean stop() throws Exception {
-        return false;
+        close();
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 3f70ce1..5c1583e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -34,7 +34,6 @@ import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.NodeResolverFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -48,7 +47,8 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
     protected static INodeResolver nodeResolver;
     protected Map<String, String> configuration;
     protected FileSplit[] inputFileSplits;
-    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log
+                                             // storage
     protected boolean isFeed;
     protected String expression;
     // transient fields (They don't need to be serialized and transferred)
@@ -84,7 +84,7 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         return constraints;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index ea60f43..6fdc42d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -35,7 +35,6 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.util.AsterixRuntimeUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
@@ -106,7 +105,7 @@ public class SocketInputStreamProviderFactory implements IInputStreamProviderFac
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         List<String> locations = new ArrayList<String>();
         for (Pair<String, Integer> socket : sockets) {
             locations.add(socket.first);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
index 484626a..95378cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -27,7 +27,6 @@ import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStreamProvider;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -54,7 +53,7 @@ public class TwitterFirehoseStreamProviderFactory implements IInputStreamProvide
     private Map<String, String> configuration;
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
         String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
         String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
         String[] locations = null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index e39b507..cd4a3c1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -58,7 +58,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
 
     @Override
     public AInputStream getInputStream() throws Exception {
-        twitterServer.start();
         return twitterServer;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 7e28c35..d0348c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -149,7 +149,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
         int waitCycleCount = 0;
         ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        while (ingestionRuntime == null && waitCycleCount < 10) {
+        while (ingestionRuntime == null && waitCycleCount < 1000) {
             try {
                 Thread.sleep(3000);
                 waitCycleCount++;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 3cb5d64..36c11e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -241,7 +241,8 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
         FeedRuntimeId runtimeId = null;
         FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
         if (endFeedMessage.isCompleteDisconnection()) {
-            // subscribableRuntimeType represents the location at which the feed connection receives data
+            // subscribableRuntimeType represents the location at which the feed connection receives
+            // data
             FeedRuntimeType runtimeType = null;
             switch (subscribableRuntimeType) {
                 case INTAKE:
@@ -257,15 +258,19 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
             runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
             CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
                     .getFeedRuntime(connectionId, runtimeId);
-            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            if (feedRuntime != null) {
+                feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            }
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
             }
         } else {
-            // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+            // subscribaleRuntimeType represents the location for data hand-off in presence of
+            // subscribers
             switch (subscribableRuntimeType) {
                 case INTAKE:
-                    // illegal state as data hand-off from one feed to another does not happen at intake
+                    // illegal state as data hand-off from one feed to another does not happen at
+                    // intake
                     throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
                 case COMPUTE:
                     // feed could be primary or secondary, doesn't matter

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index c128545..50d8ac0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -58,13 +58,12 @@ public class FeedUtils {
 
     public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
             AlgebricksPartitionConstraint partitionConstraints) throws Exception {
-        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
-        String[] locations = null;
         if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
             throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints");
-        } else {
-            locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         }
+        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+        String[] locations = null;
+        locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         List<FileSplit> splits = new ArrayList<FileSplit>();
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
         int i = 0;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 7ac0428..9a72135 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
@@ -199,8 +198,8 @@ public class HDFSUtils {
         return conf;
     }
 
-    public static AlgebricksPartitionConstraint getPartitionConstraints(
-            AlgebricksPartitionConstraint clusterLocations) {
+    public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
+            AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
             ArrayList<String> locs = new ArrayList<String>();
             Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e6f478d8/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index e34a09b..6b11d21 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -23,14 +23,14 @@ import java.util.Map;
 
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,14 +48,17 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
 
     private Map<String, String> configuration;
 
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+
     @Override
     public String getAlias() {
         return "test_typed";
     }
 
     @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1);
+        return clusterLocations;
     }
 
     @Override


Mime
View raw message