asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [08/13] incubator-asterixdb git commit: Improve Error Handling in Local Directory Feeds
Date Sat, 26 Mar 2016 20:27:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
deleted file mode 100644
index a301c1a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamProviderFactory.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.SocketServerInputStreamProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SocketServerInputStreamProviderFactory implements IInputStreamProviderFactory {
-
-    private static final long serialVersionUID = 1L;
-    private List<Pair<String, Integer>> sockets;
-    private Mode mode = Mode.IP;
-
-    public static enum Mode {
-        NC,
-        IP
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws AsterixException {
-        try {
-            sockets = new ArrayList<Pair<String, Integer>>();
-            String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
-            if (modeValue != null) {
-                mode = Mode.valueOf(modeValue.trim().toUpperCase());
-            }
-            String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
-            if (socketsValue == null) {
-                throw new IllegalArgumentException(
-                        "\'sockets\' parameter not specified as part of adapter configuration");
-            }
-            Map<InetAddress, Set<String>> ncMap;
-            ncMap = AsterixRuntimeUtil.getNodeControllerMap();
-            List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
-            String[] socketsArray = socketsValue.split(",");
-            Random random = new Random();
-            for (String socket : socketsArray) {
-                String[] socketTokens = socket.split(":");
-                String host = socketTokens[0].trim();
-                int port = Integer.parseInt(socketTokens[1].trim());
-                Pair<String, Integer> p = null;
-                switch (mode) {
-                    case IP:
-                        Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
-                        if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
-                            throw new IllegalArgumentException("Invalid host " + host
-                                    + " as it is not part of the AsterixDB cluster. Valid choices are "
-                                    + StringUtils.join(ncMap.keySet(), ", "));
-                        }
-                        String[] ncArray = ncsOnIp.toArray(new String[] {});
-                        String nc = ncArray[random.nextInt(ncArray.length)];
-                        p = new Pair<String, Integer>(nc, port);
-                        break;
-
-                    case NC:
-                        p = new Pair<String, Integer>(host, port);
-                        if (!ncs.contains(host)) {
-                            throw new IllegalArgumentException("Invalid NC " + host
-                                    + " as it is not part of the AsterixDB cluster. Valid choices are "
-                                    + StringUtils.join(ncs, ", "));
-
-                        }
-                        break;
-                }
-                sockets.add(p);
-            }
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-    }
-
-    @Override
-    public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        try {
-            Pair<String, Integer> socket = sockets.get(partition);
-            ServerSocket server;
-            server = new ServerSocket(socket.second);
-            return new SocketServerInputStreamProvider(server);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        List<String> locations = new ArrayList<String>();
-        for (Pair<String, Integer> socket : sockets) {
-            locations.add(socket.first);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-    }
-
-    public List<Pair<String, Integer>> getSockets() {
-        return sockets;
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.STREAM;
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
new file mode 100644
index 0000000..8052822
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
+ */
+public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+     * determines the count constraint for the ingestion operator.
+     **/
+    private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+
+    /**
+     * The absolute locations where ingestion operator instances will be placed.
+     **/
+    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
+    private Map<String, String> configuration;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+        String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
+        String[] locations = null;
+        if (ingestionLocationParam != null) {
+            locations = ingestionLocationParam.split(",");
+        }
+        int count = locations != null ? locations.length : 1;
+        if (ingestionCardinalityParam != null) {
+            count = Integer.parseInt(ingestionCardinalityParam);
+        }
+
+        List<String> chosenLocations = new ArrayList<String>();
+        String[] availableLocations = locations != null ? locations
+                : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
+        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+            chosenLocations.add(availableLocations[k]);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        try {
+            return new TwitterFirehoseInputStream(configuration, ctx, partition);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
deleted file mode 100644
index 7b09ade..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStreamProvider;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
- * simulates a twitter firehose with tweets being "pushed" into Asterix at a
- * configurable rate measured in terms of TPS (tweets/second). The stream of
- * tweets lasts for a configurable duration (measured in seconds).
- */
-public class TwitterFirehoseStreamProviderFactory implements IInputStreamProviderFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
-     * determines the count constraint for the ingestion operator.
-     **/
-    private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
-
-    /**
-     * The absolute locations where ingestion operator instances will be placed.
-     **/
-    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
-
-    private Map<String, String> configuration;
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
-        String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
-        String[] locations = null;
-        if (ingestionLocationParam != null) {
-            locations = ingestionLocationParam.split(",");
-        }
-        int count = locations != null ? locations.length : 1;
-        if (ingestionCardinalityParam != null) {
-            count = Integer.parseInt(ingestionCardinalityParam);
-        }
-
-        List<String> chosenLocations = new ArrayList<String>();
-        String[] availableLocations = locations != null ? locations
-                : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
-        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
-            chosenLocations.add(availableLocations[k]);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.STREAM;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) {
-        this.configuration = configuration;
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
-    @Override
-    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        return new TwitterFirehoseInputStreamProvider(configuration, ctx, partition);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
deleted file mode 100644
index e1ab331..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
-
-    public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
-        super(read, inputSplits, readSchedule, nodeName, conf, snapshot,
-                snapshot == null ? null : ExternalIndexerProvider.getIndexer(configuration));
-        value = new Text();
-        if (snapshot != null) {
-            if (currentSplitIndex < snapshot.size()) {
-                indexer.reset(this);
-            }
-        }
-    }
-
-    @Override
-    public AInputStream getInputStream() {
-        return new HDFSInputStream();
-    }
-
-    private class HDFSInputStream extends AInputStream {
-        int pos = 0;
-
-        @Override
-        public int read() throws IOException {
-            if (value.getLength() < pos) {
-                if (!readMore()) {
-                    return -1;
-                }
-            } else if (value.getLength() == pos) {
-                pos++;
-                return ExternalDataConstants.BYTE_LF;
-            }
-            return value.getBytes()[pos++];
-        }
-
-        private int readRecord(byte[] buffer, int offset, int len) {
-            int actualLength = value.getLength() + 1;
-            if ((actualLength - pos) > len) {
-                //copy partial record
-                System.arraycopy(value.getBytes(), pos, buffer, offset, len);
-                pos += len;
-                return len;
-            } else {
-                int numBytes = value.getLength() - pos;
-                System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
-                buffer[offset + numBytes] = ExternalDataConstants.LF;
-                pos += numBytes;
-                numBytes++;
-                return numBytes;
-            }
-        }
-
-        @Override
-        public int read(byte[] buffer, int offset, int len) throws IOException {
-            if (value.getLength() > pos) {
-                return readRecord(buffer, offset, len);
-            }
-            if (!readMore()) {
-                return -1;
-            }
-            return readRecord(buffer, offset, len);
-        }
-
-        private boolean readMore() throws IOException {
-            try {
-                pos = 0;
-                return HDFSInputStreamProvider.this.hasNext();
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
-
-        @Override
-        public boolean skipError() throws Exception {
-            return true;
-        }
-
-        @Override
-        public boolean stop() throws Exception {
-            return false;
-        }
-
-        @Override
-        public void setFeedLogManager(FeedLogManager logManager) {
-        }
-
-        @Override
-        public void setController(AbstractFeedDataFlowController controller) {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
deleted file mode 100644
index fbe6035..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.nio.file.Path;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-public class LocalFSInputStreamProvider implements IInputStreamProvider {
-
-    private final String expression;
-    private final boolean isFeed;
-    private final Path path;
-    private FeedLogManager feedLogManager;
-
-    public LocalFSInputStreamProvider(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
-            final Map<String, String> configuration, final int partition, final String expression,
-            final boolean isFeed) {
-        this.expression = expression;
-        this.isFeed = isFeed;
-        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
-    }
-
-    @Override
-    public AInputStream getInputStream() throws HyracksDataException {
-        final LocalFileSystemInputStream stream = new LocalFileSystemInputStream(path, expression, isFeed);
-        stream.setFeedLogManager(feedLogManager);
-        return stream;
-    }
-
-    @Override
-    public void setFeedLogManager(final FeedLogManager feedLogManager) {
-        this.feedLogManager = feedLogManager;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
deleted file mode 100644
index f842638..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketClientInputStreamProvider.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class SocketClientInputStreamProvider implements IInputStreamProvider {
-
-    private static final Logger LOGGER = Logger.getLogger(SocketClientInputStreamProvider.class.getName());
-    private final Socket socket;
-
-    public SocketClientInputStreamProvider(Pair<String, Integer> ipAndPort) throws HyracksDataException {
-        try {
-            socket = new Socket(ipAndPort.first, ipAndPort.second);
-        } catch (IOException e) {
-            LOGGER.error(
-                    "Problem in creating socket against host " + ipAndPort.first + " on the port " + ipAndPort.second,
-                    e);
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public AInputStream getInputStream() throws HyracksDataException {
-        InputStream in;
-        try {
-            in = socket.getInputStream();
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-        return new AInputStream() {
-            @Override
-            public int read() throws IOException {
-                throw new IOException("method not supported. use read(byte[] buffer, int offset, int length) instead");
-            }
-
-            @Override
-            public int read(byte[] buffer, int offset, int length) throws IOException {
-                return in.read(buffer, offset, length);
-            }
-
-            @Override
-            public boolean stop() throws Exception {
-                if (!socket.isClosed()) {
-                    try {
-                        in.close();
-                    } finally {
-                        socket.close();
-                    }
-                }
-                return true;
-            }
-
-            @Override
-            public boolean skipError() throws Exception {
-                return false;
-            }
-
-            @Override
-            public void setFeedLogManager(FeedLogManager logManager) {
-            }
-
-            @Override
-            public void setController(AbstractFeedDataFlowController controller) {
-            }
-        };
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
deleted file mode 100644
index 64f0342..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketServerInputStreamProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.net.ServerSocket;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.SocketServerInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-
-public class SocketServerInputStreamProvider implements IInputStreamProvider {
-    private final ServerSocket server;
-
-    public SocketServerInputStreamProvider(ServerSocket server) {
-        this.server = server;
-    }
-
-    @Override
-    public AInputStream getInputStream() {
-        return new SocketServerInputStream(server);
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
deleted file mode 100644
index a979262..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.provider;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.TweetGenerator;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
-
-    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
-
-    private final ExecutorService executorService;
-
-    private final PipedOutputStream outputStream;
-
-    private final PipedInputStream inputStream;
-
-    private final TwitterServer twitterServer;
-
-    public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        try {
-            executorService = Executors.newCachedThreadPool();
-            outputStream = new PipedOutputStream();
-            inputStream = new PipedInputStream(outputStream);
-            twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public AInputStream getInputStream() {
-        return twitterServer;
-    }
-
-    private static class TwitterServer extends AInputStream {
-        private final DataProvider dataProvider;
-        private final ExecutorService executorService;
-        private final InputStream in;
-        private boolean started;
-
-        public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
-                ExecutorService executorService, InputStream in) {
-            dataProvider = new DataProvider(configuration, partition, os);
-            this.executorService = executorService;
-            this.in = in;
-            this.started = false;
-        }
-
-        @Override
-        public boolean stop() throws IOException {
-            dataProvider.stop();
-            return true;
-        }
-
-        public synchronized void start() {
-            if (!started) {
-                executorService.execute(dataProvider);
-                started = true;
-            }
-        }
-
-        @Override
-        public boolean skipError() throws Exception {
-            return false;
-        }
-
-        @Override
-        public int read() throws IOException {
-            if (!started) {
-                start();
-            }
-            return in.read();
-        }
-
-        @Override
-        public int read(byte b[], int off, int len) throws IOException {
-            if (!started) {
-                start();
-                started = true;
-            }
-            return in.read(b, off, len);
-        }
-
-        @Override
-        public void setFeedLogManager(FeedLogManager logManager) {
-        }
-
-        @Override
-        public void setController(AbstractFeedDataFlowController controller) {
-        }
-    }
-
-    private static class DataProvider implements Runnable {
-
-        public static final String KEY_MODE = "mode";
-
-        private final TweetGenerator tweetGenerator;
-        private boolean continuePush = true;
-        private int batchSize;
-        private final Mode mode;
-        private final OutputStream os;
-
-        public static enum Mode {
-            AGGRESSIVE,
-            CONTROLLED
-        }
-
-        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
-            this.tweetGenerator = new TweetGenerator(configuration, partition);
-            this.tweetGenerator.registerSubscriber(os);
-            this.os = os;
-            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
-                    : Mode.AGGRESSIVE;
-            switch (mode) {
-                case CONTROLLED:
-                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
-                    if (tpsValue == null) {
-                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
-                    }
-                    batchSize = Integer.parseInt(tpsValue);
-                    break;
-                case AGGRESSIVE:
-                    batchSize = 5000;
-                    break;
-            }
-        }
-
-        @Override
-        public void run() {
-            boolean moreData = true;
-            long startBatch;
-            long endBatch;
-            while (true) {
-                try {
-                    while (moreData && continuePush) {
-                        switch (mode) {
-                            case AGGRESSIVE:
-                                moreData = tweetGenerator.generateNextBatch(batchSize);
-                                break;
-                            case CONTROLLED:
-                                startBatch = System.currentTimeMillis();
-                                moreData = tweetGenerator.generateNextBatch(batchSize);
-                                endBatch = System.currentTimeMillis();
-                                if ((endBatch - startBatch) < 1000) {
-                                    Thread.sleep(1000 - (endBatch - startBatch));
-                                }
-                                break;
-                        }
-                    }
-                    os.close();
-                    break;
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in adapter " + e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public void stop() {
-            continuePush = false;
-        }
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 8475e45..9485b77 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -19,11 +19,9 @@
 package org.apache.asterix.external.operators;
 
 import java.util.Map;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
@@ -52,7 +50,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
 
-    /** The type associated with the ADM data output from the feed adaptor */
+    /** The type associated with the ADM data output from (the feed adapter OR the compute operator) */
     private final IAType outputType;
 
     /** unique identifier for a feed instance. */
@@ -67,12 +65,12 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     /** The source feed from which the feed derives its data from. **/
     private final FeedId sourceFeedId;
 
-    /** The subscription location at which the recipient feed receives tuples from the source feed **/
-    private final ConnectionLocation subscriptionLocation;
+    /** The subscription location at which the recipient feed receives tuples from the source feed {SOURCE_FEED_INTAKE_STAGE , SOURCE_FEED_COMPUTE_STAGE} **/
+    private final FeedRuntimeType subscriptionLocation;
 
     public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
             ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            ConnectionLocation subscriptionLocation) {
+            FeedRuntimeType subscriptionLocation) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
         this.outputType = atype;
@@ -85,45 +83,25 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.subscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager()).getFeedSubscriptionManager();
         ISubscribableRuntime sourceRuntime = null;
-        IOperatorNodePushable nodePushable = null;
+        SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                subscriptionLocation, partition);
         switch (subscriptionLocation) {
-            case SOURCE_FEED_INTAKE_STAGE:
-                try {
-                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                            FeedRuntimeType.INTAKE, partition);
-                    sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
-                    if (sourceRuntime == null) {
-                        throw new HyracksDataException(
-                                "Source intake task not found for source feed id " + sourceFeedId);
-                    }
-                    nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
-                            feedPolicyProperties, partition, nPartitions, sourceRuntime);
-
-                } catch (Exception exception) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
-                    }
-                    throw new HyracksDataException("Initialization of the feed adapter failed", exception);
-                }
+            case INTAKE:
+                sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
                 break;
-            case SOURCE_FEED_COMPUTE_STAGE:
-                SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                        FeedRuntimeType.COMPUTE, partition);
+            case COMPUTE:
                 sourceRuntime = subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
-                if (sourceRuntime == null) {
-                    throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
-                            + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
-                }
-                nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
-                        feedPolicyProperties, partition, nPartitions, sourceRuntime);
                 break;
+            default:
+                throw new HyracksDataException("Can't subscirbe to FeedRuntime with Type: " + subscriptionLocation);
         }
-        return nodePushable;
+        return new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId, feedPolicyProperties, partition,
+                nPartitions, sourceRuntime);
     }
 
     public FeedConnectionId getFeedConnectionId() {
@@ -150,7 +128,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
         return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
     }
 
-    public ConnectionLocation getSubscriptionLocation() {
+    public FeedRuntimeType getSubscriptionLocation() {
         return subscriptionLocation;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 7901f03..87e1edb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -46,10 +46,9 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
- * The runtime for @see{FeedIntakeOperationDescriptor}
+ * The first operator in a collect job in a feed.
  */
 public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
     private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
 
     private final int partition;
@@ -74,19 +73,16 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
         this.connectionId = feedConnectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
-        policyAccessor = new FeedPolicyAccessor(feedPolicy);
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
+        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
     }
 
     @Override
     public void initialize() throws HyracksDataException {
         try {
             outputRecordDescriptor = recordDesc;
-            FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
-                    .getFeedRuntimeType();
-            switch (sourceRuntimeType) {
+            switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
                 case INTAKE:
                     handleCompleteConnection();
                     break;
@@ -94,7 +90,8 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     handlePartialConnection();
                     break;
                 default:
-                    throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
+                    throw new IllegalStateException("Invalid source type "
+                            + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
             }
 
             State state = collectRuntime.waitTillCollectionOver();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
index cfd1b9c..2517166 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractDataParser.java
@@ -114,7 +114,7 @@ public abstract class AbstractDataParser implements IDataParser {
             .getSerializerDeserializer(BuiltinType.ADOUBLE);
     @SuppressWarnings("unchecked")
     protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
+            .getAStringSerializerDeserializer();
     @SuppressWarnings("unchecked")
     protected ISerializerDeserializer<ABinary> binarySerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ABINARY);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 159ea73..d362201 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -22,12 +22,12 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.api.IRecordReader;
@@ -44,7 +44,6 @@ import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -106,17 +105,15 @@ public class DataflowControllerProvider {
                                 recordReader, 1);
                     }
                 case STREAM:
-                    IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
-                    IInputStreamProvider streamProvider = streamProviderFactory.createInputStreamProvider(ctx,
-                            partition);
+                    IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
+                    AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
                     IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
                     IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
-                    AInputStream inputStream = streamProvider.getInputStream();
-                    streamParser.setInputStream(inputStream);
+                    streamParser.setInputStream(stream);
                     if (isFeed) {
                         return new FeedStreamDataFlowController(ctx,
                                 (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
-                                feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, inputStream);
+                                feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream);
                     } else {
                         return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
                                 streamParser);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index e9307e5..f8d64e0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
@@ -33,9 +33,9 @@ import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparated
 import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
-import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamProviderFactory;
+import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 
@@ -53,9 +53,9 @@ public class DatasourceFactoryProvider {
         }
     }
 
-    public static IInputStreamProviderFactory getInputStreamFactory(String streamSource,
+    public static IInputStreamFactory getInputStreamFactory(String streamSource,
             Map<String, String> configuration) throws AsterixException {
-        IInputStreamProviderFactory streamSourceFactory;
+        IInputStreamFactory streamSourceFactory;
         if (ExternalDataUtils.isExternal(streamSource)) {
             String dataverse = ExternalDataUtils.getDataverse(configuration);
             streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
@@ -65,17 +65,17 @@ public class DatasourceFactoryProvider {
                     streamSourceFactory = new HDFSDataSourceFactory();
                     break;
                 case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
-                    streamSourceFactory = new LocalFSInputStreamProviderFactory();
+                    streamSourceFactory = new LocalFSInputStreamFactory();
                     break;
                 case ExternalDataConstants.STREAM_SOCKET:
                 case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
-                    streamSourceFactory = new SocketServerInputStreamProviderFactory();
+                    streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
                 case ExternalDataConstants.STREAM_SOCKET_CLIENT:
-                    streamSourceFactory = new SocketServerInputStreamProviderFactory();
+                    streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
                 case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
-                    streamSourceFactory = new TwitterFirehoseStreamProviderFactory();
+                    streamSourceFactory = new TwitterFirehoseStreamFactory();
                     break;
                 default:
                     throw new AsterixException("unknown input stream factory");
@@ -90,7 +90,7 @@ public class DatasourceFactoryProvider {
             return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
         }
         String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
-        IInputStreamProviderFactory inputStreamFactory;
+        IInputStreamFactory inputStreamFactory;
         switch (parser) {
             case ExternalDataConstants.FORMAT_ADM:
             case ExternalDataConstants.FORMAT_JSON:

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 0e68698..a02152b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -206,8 +206,9 @@ public class ExternalDataConstants {
      * Size default values
      */
     public static final int DEFAULT_BUFFER_SIZE = 4096;
-    public static final int DEFAULT_BUFFER_INCREMENT = 4096;
+    public static final int DEFAULT_BUFFER_INCREMENT = 2048;
     public static final int DEFAULT_QUEUE_SIZE = 64;
+    public static final int MAX_RECORD_SIZE = 32000000;
 
     /**
      * Expected parameter values
@@ -226,4 +227,6 @@ public class ExternalDataConstants {
     public static final String FORMAT_CSV = "csv";
     public static final String TEST_RECORD_WITH_PK = "test-record-with-pk";
 
+    public static final String ERROR_LARGE_RECORD = "Record is too large";
+    public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 32139f1..42fe8bf 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.om.types.ARecordType;
@@ -116,13 +116,13 @@ public class ExternalDataUtils {
         return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
     }
 
-    public static IInputStreamProviderFactory createExternalInputStreamFactory(String dataverse, String stream)
+    public static IInputStreamFactory createExternalInputStreamFactory(String dataverse, String stream)
             throws AsterixException {
         try {
             String libraryName = getLibraryName(stream);
             String className = getExternalClassName(stream);
             ClassLoader classLoader = getClassLoader(dataverse, libraryName);
-            return ((IInputStreamProviderFactory) (classLoader.loadClass(className).newInstance()));
+            return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance()));
         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
             throw new AsterixException("Failed to create stream factory", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 386a8cf..4eec348 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -42,7 +42,7 @@ import org.apache.log4j.Logger;
 public class FileSystemWatcher {
 
     private static final Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
-    private final WatchService watcher;
+    private WatchService watcher;
     private final HashMap<WatchKey, Path> keys;
     private final LinkedList<File> files = new LinkedList<File>();
     private Iterator<File> it;
@@ -53,17 +53,14 @@ public class FileSystemWatcher {
     private boolean done;
     private File current;
     private AbstractFeedDataFlowController controller;
+    private final LinkedList<Path> dirs;
 
-    public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) throws HyracksDataException {
-        try {
-            this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null;
-            this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
-            this.expression = expression;
-            this.path = inputResource;
-            this.isFeed = isFeed;
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
+    public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) {
+        this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
+        this.expression = expression;
+        this.path = inputResource;
+        this.isFeed = isFeed;
+        this.dirs = new LinkedList<Path>();
     }
 
     public void setFeedLogManager(FeedLogManager feedLogManager) {
@@ -72,11 +69,19 @@ public class FileSystemWatcher {
 
     public void init() throws HyracksDataException {
         try {
-            LinkedList<Path> dirs = null;
-            dirs = new LinkedList<Path>();
+            dirs.clear();
             LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
             it = files.iterator();
             if (isFeed) {
+                keys.clear();
+                if (watcher != null) {
+                    try {
+                        watcher.close();
+                    } catch (IOException e) {
+                        LOGGER.warn("Failed to close watcher service", e);
+                    }
+                }
+                watcher = FileSystems.getDefault().newWatchService();
                 for (Path path : dirs) {
                     register(path);
                 }
@@ -125,7 +130,7 @@ public class FileSystemWatcher {
         return (WatchEvent<T>) event;
     }
 
-    private void handleEvents(WatchKey key) {
+    private void handleEvents(WatchKey key) throws IOException {
         // get dir associated with the key
         Path dir = keys.get(key);
         if (dir == null) {
@@ -143,7 +148,10 @@ public class FileSystemWatcher {
                 if (LOGGER.isEnabledFor(Level.WARN)) {
                     LOGGER.warn("Overflow event. Some events might have been missed");
                 }
-                continue;
+                // need to read and validate all files.
+                //TODO: use btrees for all logs
+                init();
+                return;
             }
 
             // Context for directory entry event is the file name of entry
@@ -172,6 +180,7 @@ public class FileSystemWatcher {
         if (!done) {
             if (watcher != null) {
                 watcher.close();
+                watcher = null;
             }
             if (logManager != null) {
                 if (current != null) {
@@ -205,7 +214,7 @@ public class FileSystemWatcher {
         return false;
     }
 
-    public boolean hasNext() throws HyracksDataException {
+    public boolean hasNext() throws IOException {
         if (it.hasNext()) {
             return true;
         }
@@ -222,6 +231,7 @@ public class FileSystemWatcher {
         while (key != null) {
             handleEvents(key);
             if (endOfEvents(key)) {
+                close();
                 return false;
             }
             key = watcher.poll();
@@ -237,12 +247,18 @@ public class FileSystemWatcher {
                 if (LOGGER.isEnabledFor(Level.WARN)) {
                     LOGGER.warn("Feed Closed");
                 }
-                return false;
+                if (watcher == null) {
+                    return false;
+                }
+                continue;
             } catch (ClosedWatchServiceException e) {
                 if (LOGGER.isEnabledFor(Level.WARN)) {
                     LOGGER.warn("The watcher has exited");
                 }
-                return false;
+                if (watcher == null) {
+                    return false;
+                }
+                continue;
             }
             handleEvents(key);
             if (endOfEvents(key)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 84ccc35..936ada5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -28,7 +28,7 @@ import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingScheduler;
 import org.apache.asterix.external.indexing.RecordId.RecordIdType;
-import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hadoop.fs.BlockLocation;
@@ -186,7 +186,7 @@ public class HDFSUtils {
         conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
                 configuration.get(ExternalDataConstants.KEY_HDFS_URL).trim());
         conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, ExternalDataConstants.CLASS_NAME_HDFS_FILESYSTEM);
-        conf.setClassLoader(HDFSInputStreamProvider.class.getClassLoader());
+        conf.setClassLoader(HDFSInputStream.class.getClassLoader());
         conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, configuration.get(ExternalDataConstants.KEY_PATH).trim());
         conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, formatClassName);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/121e1d9a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
index 04fe6ec..da2cebd 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
@@ -18,41 +18,42 @@
  */
 package org.apache.asterix.external.classad;
 
+import org.apache.asterix.external.classad.object.pool.ClassAdObjectPool;
 import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class AttributeReference extends ExprTree {
 
-    private ExprTree expr;
+    private final ExprTreeHolder expr;
     private boolean absolute;
-    private AMutableCharArrayString attributeStr;
-    private ClassAd current = new ClassAd(false, false);
-    private ExprList adList = new ExprList();
-    private Value val = new Value();
-    private MutableBoolean rVal = new MutableBoolean(false);
+    private final AMutableCharArrayString attributeStr;
+    private final ExprList adList;
+    private final Value val;
     private AttributeReference tempAttrRef;
-    private EvalState tstate = new EvalState();
+    private final EvalState tstate;
+    private MutableBoolean rVal = new MutableBoolean(false);
+    private ClassAd current;
 
     public ExprTree getExpr() {
         return expr;
     }
 
     public void setExpr(ExprTree expr) {
-        this.expr = expr == null ? null : expr.self();
+        this.expr.setInnerTree(expr.self());
     }
 
-    public AttributeReference() {
-        expr = null;
-        attributeStr = null;
+    public AttributeReference(ClassAdObjectPool objectPool) {
+        super(objectPool);
+        this.val = new Value(objectPool);
+        this.current = new ClassAd(this.objectPool);
+        this.tstate = new EvalState(this.objectPool);
+        this.adList = new ExprList(this.objectPool);
+        this.attributeStr = new AMutableCharArrayString();
+        this.expr = new ExprTreeHolder(objectPool);
         absolute = false;
     }
 
-    /// Copy Constructor
-    public AttributeReference(AttributeReference ref) throws HyracksDataException {
-        copyFrom(ref);
-    }
-
     /// Assignment operator
     @Override
     public boolean equals(Object o) {
@@ -69,8 +70,9 @@ public class AttributeReference extends ExprTree {
         return NodeKind.ATTRREF_NODE;
     }
 
-    public static AttributeReference createAttributeReference(ExprTree expr, AMutableCharArrayString attrName) {
-        return createAttributeReference(expr, attrName, false);
+    public static AttributeReference createAttributeReference(ExprTree expr, AMutableCharArrayString attrName,
+            ClassAdObjectPool objectPool) {
+        return createAttributeReference(expr, attrName, false, objectPool);
     }
 
     /**
@@ -80,7 +82,7 @@ public class AttributeReference extends ExprTree {
      */
     @Override
     public ExprTree copy() throws HyracksDataException {
-        AttributeReference newTree = new AttributeReference();
+        AttributeReference newTree = objectPool.attrRefPool.get();
         newTree.copyFrom(this);
         return newTree;
     }
@@ -94,14 +96,8 @@ public class AttributeReference extends ExprTree {
      * @throws HyracksDataException
      */
     public boolean copyFrom(AttributeReference ref) throws HyracksDataException {
-        if (attributeStr == null) {
-            attributeStr = new AMutableCharArrayString(ref.attributeStr);
-        } else {
-            attributeStr.setValue(ref.attributeStr);
-        }
-        if (ref.expr != null) {
-            expr = ref.expr.copy();
-        }
+        attributeStr.setValue(ref.attributeStr);
+        expr.setInnerTree(ref.expr);
         super.copyFrom(ref);
         this.absolute = ref.absolute;
         return true;
@@ -127,7 +123,7 @@ public class AttributeReference extends ExprTree {
             if (absolute != other_ref.absolute || !attributeStr.equals(other_ref.attributeStr)) {
                 is_same = false;
             } else if ((expr == null && other_ref.expr == null) || (expr.equals(other_ref.expr))
-                    || (expr != null && other_ref.expr != null && ((AttributeReference) expr).sameAs(other_ref.expr))) {
+                    || (expr != null && other_ref.expr != null && expr.sameAs(other_ref.expr))) {
                 // Will this check result in infinite recursion? How do I stop it?
                 is_same = true;
             } else {
@@ -137,18 +133,9 @@ public class AttributeReference extends ExprTree {
         return is_same;
     }
 
-    // a private ctor for use in significant expr identification
-    private AttributeReference(ExprTree tree, AMutableCharArrayString attrname, boolean absolut) {
-        attributeStr = attrname;
-        expr = tree == null ? null : tree.self();
-        absolute = absolut;
-    }
-
     @Override
     public void privateSetParentScope(ClassAd parent) {
-        if (expr != null) {
-            expr.setParentScope(parent);
-        }
+        expr.setParentScope(parent);
     }
 
     public void getComponents(ExprTreeHolder tree, AMutableCharArrayString attr, MutableBoolean abs)
@@ -161,7 +148,7 @@ public class AttributeReference extends ExprTree {
     public EvalResult findExpr(EvalState state, ExprTreeHolder tree, ExprTreeHolder sig, boolean wantSig)
             throws HyracksDataException {
         // establish starting point for search
-        if (expr == null) {
+        if (expr.getTree() == null) {
             // "attr" and ".attr"
             current = absolute ? state.getRootAd() : state.getCurAd();
             if (absolute && (current == null)) { // NAC - circularity so no root
@@ -186,7 +173,7 @@ public class AttributeReference extends ExprTree {
         }
 
         if (val.isListValue()) {
-            ExprList eList = new ExprList();
+            ExprList eList = objectPool.exprListPool.get();
             //
             // iterate through exprList and apply attribute reference
             // to each exprTree
@@ -195,12 +182,12 @@ public class AttributeReference extends ExprTree {
                     return (EvalResult.EVAL_FAIL);
                 } else {
                     if (tempAttrRef == null) {
-                        tempAttrRef = new AttributeReference();
+                        tempAttrRef = objectPool.attrRefPool.get();
                     } else {
                         tempAttrRef.reset();
                     }
                     createAttributeReference(currExpr.copy(), attributeStr, false, tempAttrRef);
-                    val.clear();
+                    val.setUndefinedValue();
                     // Create new EvalState, within this scope, because
                     // attrRef is only temporary, so we do not want to
                     // cache the evaluated result in the outer state object.
@@ -212,8 +199,8 @@ public class AttributeReference extends ExprTree {
                         return (EvalResult.EVAL_FAIL);
                     }
 
-                    ClassAd evaledAd = new ClassAd();
-                    ExprList evaledList = new ExprList();
+                    ClassAd evaledAd = objectPool.classAdPool.get();
+                    ExprList evaledList = objectPool.exprListPool.get();
                     if (val.isClassAdValue(evaledAd)) {
                         eList.add(evaledAd);
                         continue;
@@ -221,12 +208,13 @@ public class AttributeReference extends ExprTree {
                         eList.add(evaledList.copy());
                         continue;
                     } else {
-                        eList.add(Literal.createLiteral(val));
+                        eList.add(Literal.createLiteral(val, objectPool));
                     }
                 }
             }
-            tree.setInnerTree(ExprList.createExprList(eList));
-            ClassAd newRoot = new ClassAd();
+
+            tree.setInnerTree(ExprList.createExprList(eList, objectPool));
+            ClassAd newRoot = objectPool.classAdPool.get();
             tree.setParentScope(newRoot);
             return EvalResult.EVAL_OK;
         }
@@ -242,8 +230,15 @@ public class AttributeReference extends ExprTree {
         if (current == null) {
             return EvalResult.EVAL_UNDEF;
         }
-        int rc = current.lookupInScope(attributeStr.toString(), tree, state);
-        if (expr == null && !absolute && rc == EvalResult.EVAL_UNDEF.ordinal() && current.getAlternateScope() != null) {
+        int rc = 0;
+        try {
+            rc = current.lookupInScope(attributeStr.toString(), tree, state);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        if (expr.getTree() == null && !absolute && rc == EvalResult.EVAL_UNDEF.ordinal()
+                && current.getAlternateScope() != null) {
             rc = current.getAlternateScope().lookupInScope(attributeStr.toString(), tree, state);
         }
         return EvalResult.values()[rc];
@@ -251,9 +246,10 @@ public class AttributeReference extends ExprTree {
 
     @Override
     public boolean publicEvaluate(EvalState state, Value val) throws HyracksDataException {
-        ExprTreeHolder tree = new ExprTreeHolder();
-        ExprTreeHolder dummy = new ExprTreeHolder();
-        ClassAd curAd = new ClassAd(state.getCurAd());
+        ExprTreeHolder tree = objectPool.mutableExprPool.get();
+        ExprTreeHolder dummy = objectPool.mutableExprPool.get();
+        ClassAd curAd = objectPool.classAdPool.get();
+        curAd.copyFrom(state.getCurAd());
         boolean rval;
         // find the expression and the evalstate
         switch (findExpr(state, tree, dummy, false)) {
@@ -286,10 +282,13 @@ public class AttributeReference extends ExprTree {
 
     @Override
     public boolean privateEvaluate(EvalState state, Value val, ExprTreeHolder sig) throws HyracksDataException {
-        ExprTreeHolder tree = new ExprTreeHolder();
-        ExprTreeHolder exprSig = new ExprTreeHolder();
-        ClassAd curAd = new ClassAd(state.getCurAd());
-        MutableBoolean rval = new MutableBoolean(true);
+        ExprTreeHolder tree = objectPool.mutableExprPool.get();
+        ExprTreeHolder exprSig = objectPool.mutableExprPool.get();
+        ClassAd curAd = objectPool.classAdPool.get();
+        curAd.copyFrom(state.getCurAd());
+        MutableBoolean rval = objectPool.boolPool.get();
+        rval.setValue(true);
+
         switch (findExpr(state, tree, exprSig, true)) {
             case EVAL_FAIL:
                 rval.setValue(false);
@@ -314,7 +313,9 @@ public class AttributeReference extends ExprTree {
             default:
                 throw new HyracksDataException("ClassAd:  Should not reach here");
         }
-        sig.setInnerTree((new AttributeReference(exprSig, attributeStr, absolute)));
+        AttributeReference newAttrRef = objectPool.attrRefPool.get();
+        newAttrRef.setValue(exprSig, attributeStr, absolute);
+        sig.setInnerTree(newAttrRef);
         state.getCurAd().setValue(curAd);
         return rval.booleanValue();
     }
@@ -322,8 +323,8 @@ public class AttributeReference extends ExprTree {
     @Override
     public boolean privateFlatten(EvalState state, Value val, ExprTreeHolder ntree, AMutableInt32 op)
             throws HyracksDataException {
-        ExprTreeHolder tree = new ExprTreeHolder();
-        ExprTreeHolder dummy = new ExprTreeHolder();
+        ExprTreeHolder tree = objectPool.mutableExprPool.get();
+        ExprTreeHolder dummy = objectPool.mutableExprPool.get();
         ClassAd curAd;
         boolean rval;
         ntree.setInnerTree(null); // Just to be safe...  wenger 2003-12-11.
@@ -338,8 +339,8 @@ public class AttributeReference extends ExprTree {
                 return true;
             case EVAL_UNDEF:
                 if (expr != null && state.isFlattenAndInline()) {
-                    ExprTreeHolder expr_ntree = new ExprTreeHolder();
-                    Value expr_val = new Value();
+                    ExprTreeHolder expr_ntree = objectPool.mutableExprPool.get();
+                    Value expr_val = objectPool.valuePool.get();
                     if (state.getDepthRemaining() <= 0) {
                         val.setErrorValue();
                         state.getCurAd().setValue(curAd);
@@ -349,7 +350,7 @@ public class AttributeReference extends ExprTree {
                     rval = expr.publicFlatten(state, expr_val, expr_ntree);
                     state.incrementDepth();
                     if (rval && expr_ntree.getInnerTree() != null) {
-                        ntree.setInnerTree(createAttributeReference(expr_ntree, attributeStr));
+                        ntree.setInnerTree(createAttributeReference(expr_ntree, attributeStr, objectPool));
                         if (ntree.getInnerTree() != null) {
                             state.getCurAd().setValue(curAd);
                             return true;
@@ -412,14 +413,16 @@ public class AttributeReference extends ExprTree {
      *            expr is not NULL, default value is false;
      */
     public static AttributeReference createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr,
-            boolean absolut) {
-        return (new AttributeReference(tree, attrStr, absolut));
+            boolean absolut, ClassAdObjectPool objectPool) {
+        AttributeReference attrRef = objectPool.attrRefPool.get();
+        attrRef.setValue(tree, attrStr, absolut);
+        return attrRef;
     }
 
     public void setValue(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut) {
         this.absolute = absolut;
-        this.attributeStr = attrStr;
-        this.expr = tree == null ? null : tree.self();
+        this.attributeStr.copyValue(attrStr.getValue(), attrStr.size());
+        this.expr.setInnerTree(tree == null ? null : tree.self());
     }
 
     public static void createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut,
@@ -429,8 +432,8 @@ public class AttributeReference extends ExprTree {
 
     @Override
     public boolean privateEvaluate(EvalState state, Value val) throws HyracksDataException {
-        ExprTreeHolder tree = new ExprTreeHolder();
-        ExprTreeHolder dummy = new ExprTreeHolder();
+        ExprTreeHolder tree = objectPool.mutableExprPool.get();
+        ExprTreeHolder dummy = objectPool.mutableExprPool.get();
         ClassAd curAd;
         boolean rval;
 
@@ -467,8 +470,12 @@ public class AttributeReference extends ExprTree {
 
     @Override
     public void reset() {
-        if (expr != null) {
-            expr.reset();
-        }
+        expr.reset();
+        val.reset();
+        current.reset();
+        tstate.reset();
+        adList.reset();
+        attributeStr.reset();
+        absolute = false;
     }
 }


Mime
View raw message