asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [16/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
Date Thu, 14 Jan 2016 20:32:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java
new file mode 100644
index 0000000..fb81373
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
+
+/**
+ * Provides management of memory allocated for handling feed data flow through the node controller
+ */
+public interface IFeedMemoryManager {
+
+    public static final int START_COLLECTION_SIZE = 20;
+    public static final int START_POOL_SIZE = 10;
+
+    /**
+     * Gets a memory component allocated from the feed memory budget
+     * 
+     * @param type
+     *            the kind of memory component that needs to be allocated
+     * @return
+     * @see Type
+     */
+    public IFeedMemoryComponent getMemoryComponent(Type type);
+
+    /**
+     * Expand a memory component by the default increment
+     * 
+     * @param memoryComponent
+     * @return true if the expansion succeeded
+     *         false if the requested expansion violates the configured budget
+     */
+    public boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent);
+
+    /**
+     * Releases the given memory component to reclaim the memory allocated for the component
+     * 
+     * @param memoryComponent
+     *            the memory component that is being reclaimed/released
+     */
+    public void releaseMemoryComponent(IFeedMemoryComponent memoryComponent);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
new file mode 100644
index 0000000..aa1af3a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.dataflow.value.JSONSerializable;
+
+/**
+ * A control message exchanged between {@Link IFeedManager} and {@Link CentralFeedManager} that requests for an action or reporting of an event
+ */
+public interface IFeedMessage extends Serializable, JSONSerializable {
+
+    public enum MessageType {
+        END,
+        XAQL,
+        FEED_REPORT,
+        NODE_REPORT,
+        STORAGE_REPORT,
+        CONGESTION,
+        PREPARE_STALL,
+        TERMINATE_FLOW,
+        SCALE_IN_REQUEST,
+        COMMIT_ACK,
+        COMMIT_ACK_RESPONSE,
+        THROTTLING_ENABLED
+    }
+
+    /**
+     * Gets the type associated with this message
+     * 
+     * @return MessageType type associated with this message
+     */
+    public MessageType getMessageType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java
new file mode 100644
index 0000000..42f71a7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+/**
+ * Provides the functionality of sending a meesage ({@code IFeedMessage} to the {@code CentralFeedManager}
+ */
+public interface IFeedMessageService extends IFeedService {
+
+    /**
+     * Sends a message ({@code IFeedMessage} to the {@code CentralFeedManager} running at the CC
+     * The message is sent asynchronously.
+     * 
+     * @param message
+     *            the message to be sent
+     */
+    public void sendMessage(IFeedMessage message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java
new file mode 100644
index 0000000..3712678
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+
+public interface IFeedMetadataManager {
+
+    /**
+     * @param feedConnectionId
+     *            connection id corresponding to the feed connection
+     * @param tuple
+     *            the erroneous tuple that raised an exception
+     * @param message
+     *            the message corresponding to the exception being raised
+     * @param feedManager
+     * @throws AsterixException
+     */
+    public void logTuple(FeedConnectionId feedConnectionId, String tuple, String message, IFeedManager feedManager)
+            throws AsterixException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
new file mode 100644
index 0000000..c35587c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+public interface IFeedMetricCollector {
+
+    public enum ValueType {
+        CPU_USAGE,
+        INFLOW_RATE,
+        OUTFLOW_RATE
+    }
+
+    public enum MetricType {
+        AVG,
+        RATE
+    }
+
+    public boolean sendReport(int senderId, int value);
+
+    public int getMetric(int senderId);
+
+    public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType);
+
+    int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+            MetricType metricType);
+
+    public void removeReportSender(int senderId);
+
+    public void resetReportSender(int senderId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
new file mode 100644
index 0000000..a8d0552
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+/**
+ * Provides for output-side buffering for a feed runtime.
+ * Right now, we only have a single output side handler
+ * and we can probably remove it completely.
+ *              ______
+ *             |      |
+ * ============|core  |============
+ * ============| op   |============
+ *             |______|^^^^^^^^^^^^
+ *                     Output Side
+ *                       Handler
+ *
+ **/
+public interface IFeedOperatorOutputSideHandler extends IFrameWriter {
+
+    public enum Type {
+        BASIC_FEED_OUTPUT_HANDLER,
+        DISTRIBUTE_FEED_OUTPUT_HANDLER,
+        COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER
+    }
+
+    public FeedId getFeedId();
+
+    public Type getType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
new file mode 100644
index 0000000..9eced07
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.management.FeedId;
+
+public interface IFeedProvider {
+
+    public void subscribeFeed(FeedId sourceDeedId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
new file mode 100644
index 0000000..269725d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public interface IFeedRuntime {
+
+    public enum FeedRuntimeType {
+        INTAKE,
+        COLLECT,
+        COMPUTE_COLLECT,
+        COMPUTE,
+        STORE,
+        OTHER,
+        ETS,
+        JOIN
+    }
+
+    public enum Mode {
+        PROCESS,
+        SPILL,
+        PROCESS_SPILL,
+        DISCARD,
+        POST_SPILL_DISCARD,
+        PROCESS_BACKLOG,
+        STALL,
+        FAIL,
+        END
+    }
+
+    /**
+     * @return the unique runtime id associated with the feedRuntime
+     */
+    public FeedRuntimeId getRuntimeId();
+
+    /**
+     * @return the frame writer associated with the feed runtime.
+     */
+    public IFrameWriter getFeedFrameWriter();
+
+    public FeedRuntimeInputHandler getInputHandler();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
new file mode 100644
index 0000000..3d3e0e5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+public interface IFeedService {
+
+    public void start() throws Exception;
+
+    public void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
new file mode 100644
index 0000000..ec4c396
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+
+public interface IFeedSubscriptionManager {
+
+    /**
+     * @param subscribableRuntime
+     */
+    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime);
+
+    /**
+     * @param subscribableRuntimeId
+     */
+    public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
+
+    /**
+     * @param subscribableRuntimeId
+     * @return
+     */
+    public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
new file mode 100644
index 0000000..6576e09
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
+
+public interface IFeedTrackingManager {
+
+    public void submitAckReport(FeedTupleCommitAckMessage ackMessage);
+
+    public void disableAcking(FeedConnectionId connectionId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWork.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWork.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWork.java
new file mode 100644
index 0000000..2f5379b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWork.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+/**
+ * Represents a feed management task. The task is executed asynchronously.
+ */
+public interface IFeedWork {
+
+    public Runnable getRunnable();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkEventListener.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkEventListener.java
new file mode 100644
index 0000000..e5797d0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkEventListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+/**
+ * Provides a callback mechanism that in invoked for events related to
+ * the execution of a feed management task.
+ */
+public interface IFeedWorkEventListener {
+
+    /**
+     * A call back that is invoked after successful completion of a feed
+     * management task.
+     */
+    public void workCompleted(IFeedWork work);
+
+    /**
+     * A call back that is invokved after a failed execution of a feed
+     * management task.
+     * 
+     * @param e
+     *            exception encountered during execution of the task.
+     */
+    public void workFailed(IFeedWork work, Exception e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkManager.java
new file mode 100644
index 0000000..37661b7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedWorkManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+public interface IFeedWorkManager {
+
+    public void submitWork(IFeedWork work, IFeedWorkEventListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
new file mode 100644
index 0000000..647d847
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+public interface IFrameEventCallback {
+
+    public enum FrameEvent {
+        FINISHED_PROCESSING,
+        PENDING_WORK_THRESHOLD_REACHED,
+        PENDING_WORK_DONE,
+        NO_OP,
+        FINISHED_PROCESSING_SPILLAGE
+    }
+
+    public void frameEvent(FrameEvent frameEvent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
new file mode 100644
index 0000000..eab7a64
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface IFramePostProcessor {
+
+    public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
new file mode 100644
index 0000000..55461b7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.nio.ByteBuffer;
+
+public interface IFramePreprocessor {
+
+    public void preProcess(ByteBuffer frame) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
new file mode 100644
index 0000000..4848ed8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.util.Map;
+
+public interface IIntakeProgressTracker {
+
+    public void configure(Map<String, String> configuration);
+
+    public void notifyIngestedTupleTimestamp(long timestamp);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
new file mode 100644
index 0000000..bdfbfdb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+public interface IMessageReceiver<T> {
+
+    public void sendMessage(T message);
+
+    public void close(boolean processPending);
+
+    public void start();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
new file mode 100644
index 0000000..ee07188
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.util.List;
+
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+/**
+ * Represent a feed runtime whose output can be routed along other parallel path(s).
+ */
+public interface ISubscribableRuntime extends IFeedRuntime {
+
+    /**
+     * @param collectionRuntime
+     * @throws Exception
+     */
+    public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception;
+
+    /**
+     * @param collectionRuntime
+     * @throws Exception
+     */
+    public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception;
+
+    /**
+     * @return
+     * @throws Exception
+     */
+    public List<ISubscriberRuntime> getSubscribers();
+
+    /**
+     * @return
+     */
+    public DistributeFeedFrameWriter getFeedFrameWriter();
+
+    /**
+     * @return
+     */
+    public RecordDescriptor getRecordDescriptor();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
new file mode 100644
index 0000000..4d3e607
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.util.Map;
+
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+
+public interface ISubscriberRuntime {
+
+    public Map<String, String> getFeedPolicy();
+
+    public FeedFrameCollector getFrameCollector();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
new file mode 100644
index 0000000..b94a52e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.management.FeedId;
+
+public interface ISubscriptionProvider {
+
+    public void subscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
+
+    public void unsubscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
new file mode 100644
index 0000000..18b6ec0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+// Simply a delivery frame writer. I think we can simply get rid of this at some point {TODO}.
+public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
+
+    private final FeedConnectionId connectionId;                // [Dataverse - Feed - Dataset]
+    private IFrameWriter downstreamWriter;                      // Writer to next (Operator/Connector)
+    private final FrameTupleAccessor inputFrameTupleAccessor;   // Accessing input frame tuples
+    private final FrameTupleAppender tupleAppender;             // Append tuples to output frame
+    private final IFrame frame;                                 // Output frame
+
+    private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
+
+    public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
+            ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
+                    throws HyracksDataException {
+        this.connectionId = connectionId;
+        this.downstreamWriter = downstreamWriter;
+        inputFrameTupleAccessor = new FrameTupleAccessor(sourceRuntime.getRecordDescriptor());
+        frame = new VSizeFrame(ctx);
+        tupleAppender = new FrameTupleAppender(frame);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        downstreamWriter.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inputFrameTupleAccessor.reset(buffer);
+        int nTuple = inputFrameTupleAccessor.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
+            appendTupleToFrame();
+            tupleBuilder.reset();
+        }
+    }
+
+    private void appendTupleToFrame() throws HyracksDataException {
+        if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize())) {
+            FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
+            tupleAppender.reset(frame, true);
+            if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        downstreamWriter.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        downstreamWriter.close();
+    }
+
+    @Override
+    public FeedId getFeedId() {
+        return connectionId.getFeedId();
+    }
+
+    @Override
+    public Type getType() {
+        return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
+    }
+
+    public IFrameWriter getDownstreamWriter() {
+        return downstreamWriter;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void reset(IFrameWriter writer) {
+        this.downstreamWriter = writer;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
new file mode 100644
index 0000000..3943ced
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@DataBucket} is a wrapper around {@ByteBuffer} that expects certain number of receipients
+ */
+public class DataBucket {
+
+    private static final AtomicInteger globalBucketId = new AtomicInteger(0);
+
+    private final ByteBuffer content;       // Content
+    private final AtomicInteger readCount;  // How many reads?
+    private final int bucketId;             // Id
+    private int desiredReadCount;           // Number of expected readers
+    private ContentType contentType;        // Data, End of stream, or End of spilled data
+    private final DataBucketPool pool;      // Pool of buckets
+
+    public enum ContentType {
+        DATA, // data (feed tuple)
+        EOD, // A signal indicating that there shall be no more data
+        EOSD // End of processing of spilled data
+    }
+
+    public DataBucket(DataBucketPool pool) {
+        this.content = ByteBuffer.allocate(pool.getFrameSize());
+        this.readCount = new AtomicInteger(0);
+        this.pool = pool;
+        this.contentType = ContentType.DATA;
+        this.bucketId = globalBucketId.incrementAndGet();
+    }
+
+    public synchronized void reset(ByteBuffer frame) {
+        if (frame != null) {
+            content.flip();
+            System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
+            content.limit(frame.limit());
+            content.position(0);
+        }
+    }
+
+    public synchronized void doneReading() {
+        if (readCount.incrementAndGet() == desiredReadCount) {
+            readCount.set(0);
+            pool.returnDataBucket(this);
+        }
+    }
+
+    public void setDesiredReadCount(int rCount) {
+        this.desiredReadCount = rCount;
+    }
+
+    public ContentType getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(ContentType contentType) {
+        this.contentType = contentType;
+    }
+
+    public synchronized ByteBuffer getContent() {
+        return content;
+    }
+
+    @Override
+    public String toString() {
+        return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
new file mode 100644
index 0000000..339469e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.util.Stack;
+
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
+
+/**
+ * Represents a pool of reusable {@link DataBucket}
+ */
+public class DataBucketPool implements IFeedMemoryComponent {
+
+    /** A unique identifier for the memory component **/
+    private final int componentId;
+
+    /** The {@link IFeedMemoryManager} for the NodeController **/
+    private final IFeedMemoryManager memoryManager;
+
+    /** A collection of available data buckets {@link DataBucket} **/
+    private final Stack<DataBucket> pool;
+
+    /** The total number of data buckets {@link DataBucket} allocated **/
+    private int totalAllocation;
+
+    /** The fixed frame size as configured for the asterix runtime **/
+    private final int frameSize;
+
+    public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
+        this.componentId = componentId;
+        this.memoryManager = memoryManager;
+        this.pool = new Stack<DataBucket>();
+        this.frameSize = frameSize;
+        expand(size);
+    }
+
+    public synchronized void returnDataBucket(DataBucket bucket) {
+        pool.push(bucket);
+    }
+
+    public synchronized DataBucket getDataBucket() {
+        if (pool.size() == 0) {
+            if (!memoryManager.expandMemoryComponent(this)) {
+                return null;
+            }
+        }
+        return pool.pop();
+    }
+
+    @Override
+    public Type getType() {
+        return Type.POOL;
+    }
+
+    @Override
+    public int getTotalAllocation() {
+        return totalAllocation;
+    }
+
+    @Override
+    public int getComponentId() {
+        return componentId;
+    }
+
+    @Override
+    public void expand(int delta) {
+        for (int i = 0; i < delta; i++) {
+            DataBucket bucket = new DataBucket(this);
+            pool.add(bucket);
+        }
+        totalAllocation += delta;
+    }
+
+    @Override
+    public void reset() {
+        totalAllocation -= pool.size();
+        pool.clear();
+    }
+
+    @Override
+    public String toString() {
+        return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
+    }
+
+    public int getSize() {
+        return pool.size();
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
new file mode 100644
index 0000000..7367d5a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler.Type;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides mechanism for distributing the frames, as received from an operator to a
+ * set of registered readers. Each reader typically operates at a different pace. Readers
+ * are isolated from each other to ensure that a slow reader does not impact the progress of
+ * others.
+ **/
+public class DistributeFeedFrameWriter implements IFrameWriter {
+
+    private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
+
+    /** A unique identifier for the feed to which the incoming tuples belong. **/
+    private final FeedId feedId;
+
+    /**
+     * An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each
+     * operating in isolation.
+     **/
+    private final FrameDistributor frameDistributor;
+
+    /** The original frame writer instantiated as part of job creation **/
+    private IFrameWriter writer;
+
+    /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
+    private final FeedRuntimeType feedRuntimeType;
+
+    /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
+    private final int partition;
+
+    public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
+            FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta, IFeedManager feedManager)
+                    throws IOException {
+        this.feedId = feedId;
+        this.frameDistributor = new FrameDistributor(feedId, feedRuntimeType, partition, true,
+                feedManager.getFeedMemoryManager(), fta);
+        this.feedRuntimeType = feedRuntimeType;
+        this.partition = partition;
+        this.writer = writer;
+    }
+
+    /**
+     * @param fpa
+     *            Feed policy accessor
+     * @param frameWriter
+     *            the writer which will deliver the buffers
+     * @param connectionId
+     *            (Dataverse - Dataset - Feed)
+     * @return A frame collector.
+     * @throws Exception
+     */
+    public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
+            FeedConnectionId connectionId) throws Exception {
+        FeedFrameCollector collector = null;
+        if (!frameDistributor.isRegistered(frameWriter)) {
+            collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
+            frameDistributor.registerFrameCollector(collector);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
+            }
+            return collector;
+        } else {
+            throw new IllegalStateException("subscriber " + feedId + " already registered");
+        }
+    }
+
+    public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
+        boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
+        if (!success) {
+            throw new IllegalStateException(
+                    "Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter + " not registered.");
+        }
+    }
+
+    public void notifyEndOfFeed() {
+        frameDistributor.notifyEndOfFeed();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            frameDistributor.close();
+        } finally {
+            writer.close();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        frameDistributor.nextFrame(frame);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+        return frameDistributor.getRegisteredReaders();
+    }
+
+    public void setWriter(IFrameWriter writer) {
+        this.writer = writer;
+    }
+
+    public Type getType() {
+        return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
+    }
+
+    @Override
+    public String toString() {
+        return feedId.toString() + feedRuntimeType + "[" + partition + "]";
+    }
+
+    public FrameDistributor.DistributionMode getDistributionMode() {
+        return frameDistributor.getDistributionMode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
new file mode 100644
index 0000000..6761d9a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
+
+    private final FeedFrameCache feedFrameCache;
+
+    public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
+            throws IOException {
+        super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
+                nPartitions);
+        this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
+    }
+
+    public void process(ByteBuffer frame) throws HyracksDataException {
+        feedFrameCache.sendMessage(frame);
+        super.process(frame);
+    }
+
+    public void replayFrom(int recordId) throws HyracksDataException {
+        feedFrameCache.replayRecords(recordId);
+    }
+
+    public void dropTill(int recordId) {
+        feedFrameCache.dropTillRecordId(recordId);
+    }
+    
+    public void replayCached() throws HyracksDataException{
+        feedFrameCache.replayAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
new file mode 100644
index 0000000..483ba19
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.FrameDataException;
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedFrameUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedExceptionHandler implements IExceptionHandler {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
+
+    //TODO: Enable logging
+    private final IHyracksTaskContext ctx;
+    private final FrameTupleAccessor fta;
+
+    public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedManager feedManager, FeedConnectionId connectionId) {
+        this.ctx = ctx;
+        this.fta = fta;
+    }
+
+    @Override
+    public ByteBuffer handleException(Exception e, ByteBuffer frame) {
+        try {
+            if (e instanceof FrameDataException) {
+                fta.reset(frame);
+                FrameDataException fde = (FrameDataException) e;
+                int tupleIndex = fde.getTupleIndex();
+                try {
+                    logExceptionCausingTuple(tupleIndex, e);
+                } catch (Exception ex) {
+                    ex.addSuppressed(e);
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
+                    }
+                }
+                return FeedFrameUtil.removeBadTuple(ctx, tupleIndex, fta);
+            } else {
+                return null;
+            }
+        } catch (Exception exception) {
+            exception.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to handle exception " + exception.getMessage());
+            }
+            return null;
+        }
+    }
+
+    private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
new file mode 100644
index 0000000..cd040c9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * Allows caching of feed frames. This class is used in providing upstream backup.
+ * The tuples at the intake layer are held in this cache until these are acked by
+ * the storage layer post their persistence. On receiving an ack, appropriate tuples
+ * (recordsId < ackedRecordId) are dropped from the cache.
+ */
+public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
+
+    /**
+     * Value represents a cache feed frame
+     * Key represents the largest record Id in the frame.
+     * At the intake side, the largest record id corresponds to the last record in the frame
+     **/
+    private final Map<Integer, ByteBuffer> orderedCache;
+    private final FrameTupleAccessor tupleAccessor;
+    private final IFrameWriter frameWriter;
+    private final IHyracksTaskContext ctx;
+
+    public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
+        this.tupleAccessor = tupleAccessor;
+        this.frameWriter = frameWriter;
+        /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
+        this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void processMessage(ByteBuffer frame) throws Exception {
+        int lastRecordId = getLastRecordId(frame);
+        ByteBuffer clone = cloneFrame(frame);
+        orderedCache.put(lastRecordId, clone);
+    }
+
+    public void dropTillRecordId(int recordId) {
+        List<Integer> dropRecordIds = new ArrayList<Integer>();
+        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+            int recId = entry.getKey();
+            if (recId <= recordId) {
+                dropRecordIds.add(recId);
+            } else {
+                break;
+            }
+        }
+        for (Integer r : dropRecordIds) {
+            orderedCache.remove(r);
+        }
+    }
+
+    public void replayRecords(int startingRecordId) throws HyracksDataException {
+        boolean replayPositionReached = false;
+        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+            // the key increases monotonically
+            int maxRecordIdInFrame = entry.getKey();
+            if (!replayPositionReached) {
+                if (startingRecordId < maxRecordIdInFrame) {
+                    replayFrame(startingRecordId, entry.getValue());
+                    break;
+                } else {
+                    continue;
+                }
+            }
+        }
+    }
+
+    /**
+     * Replay the frame from the tuple (inclusive) with recordId as specified.
+     * 
+     * @param recordId
+     * @param frame
+     * @throws HyracksDataException
+     */
+    private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
+        tupleAccessor.reset(frame);
+        int nTuples = tupleAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; i++) {
+            int rid = getRecordIdAtTupleIndex(i, frame);
+            if (rid == recordId) {
+                ByteBuffer slicedFrame = splitFrame(i, frame);
+                replayFrame(slicedFrame);
+                break;
+            }
+        }
+    }
+
+    private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
+        IFrame slicedFrame = new VSizeFrame(ctx);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(slicedFrame, true);
+        int totalTuples = tupleAccessor.getTupleCount();
+        for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
+            appender.append(tupleAccessor, ti);
+        }
+        return slicedFrame.getBuffer();
+    }
+
+    /**
+     * Replay the frame
+     * 
+     * @param frame
+     * @throws HyracksDataException
+     */
+    private void replayFrame(ByteBuffer frame) throws HyracksDataException {
+        frameWriter.nextFrame(frame);
+    }
+
+    private int getLastRecordId(ByteBuffer frame) {
+        tupleAccessor.reset(frame);
+        int nTuples = tupleAccessor.getTupleCount();
+        return getRecordIdAtTupleIndex(nTuples - 1, frame);
+    }
+
+    private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
+        tupleAccessor.reset(frame);
+        int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
+        int openPartOffset = frame.getInt(recordStart + 6);
+        int numOpenFields = frame.getInt(recordStart + openPartOffset);
+        int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
+                + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
+        int lastRecordId = frame.getInt(recordStart + recordIdOffset);
+        return lastRecordId;
+    }
+
+    private ByteBuffer cloneFrame(ByteBuffer frame) {
+        ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
+        System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
+        return clone;
+    }
+
+    public void replayAll() throws HyracksDataException {
+        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+            ByteBuffer frame = entry.getValue();
+            frameWriter.nextFrame(frame);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
new file mode 100644
index 0000000..0d53524
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameCollector extends MessageReceiver<DataBucket> {
+
+    private final FeedConnectionId connectionId;
+    private final FrameDistributor frameDistributor;
+    private FeedPolicyAccessor fpa;
+    private IFrameWriter frameWriter;
+    private State state;
+
+    public enum State {
+        ACTIVE,
+        FINISHED,
+        TRANSITION,
+        HANDOVER
+    }
+
+    public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
+            IFrameWriter frameWriter, FeedConnectionId connectionId) {
+        super();
+        this.frameDistributor = frameDistributor;
+        this.fpa = feedPolicyAccessor;
+        this.connectionId = connectionId;
+        this.frameWriter = frameWriter;
+        this.state = State.ACTIVE;
+    }
+
+    @Override
+    public void processMessage(DataBucket bucket) throws Exception {
+        try {
+            ByteBuffer frame = bucket.getContent();
+            switch (bucket.getContentType()) {
+                case DATA:
+                    frameWriter.nextFrame(frame);
+                    break;
+                case EOD:
+                    closeCollector();
+                    break;
+                case EOSD:
+                    throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
+            }
+        } finally {
+            bucket.doneReading();
+        }
+    }
+
+    public void closeCollector() {
+        if (state.equals(State.TRANSITION)) {
+            super.close(true);
+            setState(State.ACTIVE);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
+            }
+        } else {
+            flushPendingMessages();
+            setState(State.FINISHED);
+            synchronized (frameDistributor.getRegisteredCollectors()) {
+                frameDistributor.getRegisteredCollectors().notifyAll();
+            }
+            disconnect();
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Closed collector " + this);
+        }
+    }
+
+    public synchronized void disconnect() {
+        setState(State.FINISHED);
+    }
+
+    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        frameWriter.nextFrame(frame);
+    }
+
+    public FeedPolicyAccessor getFeedPolicyAccessor() {
+        return fpa;
+    }
+
+    public synchronized State getState() {
+        return state;
+    }
+
+    public synchronized void setState(State state) {
+        this.state = state;
+        switch (state) {
+            case FINISHED:
+            case HANDOVER:
+                notifyAll();
+                break;
+            default:
+                break;
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
+        }
+    }
+
+    public IFrameWriter getFrameWriter() {
+        return frameWriter;
+    }
+
+    public void setFrameWriter(IFrameWriter frameWriter) {
+        this.frameWriter = frameWriter;
+    }
+
+    @Override
+    public String toString() {
+        return "FrameCollector " + connectionId + "," + state + "]";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof FeedFrameCollector) {
+            return connectionId.equals(((FeedFrameCollector) o).connectionId);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return connectionId.toString().hashCode();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
new file mode 100644
index 0000000..53ee475
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameDiscarder {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+    private final FeedRuntimeInputHandler inputHandler;
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final FeedPolicyAccessor policyAccessor;
+    private final float maxFractionDiscard;
+    private int nDiscarded;
+
+    public FeedFrameDiscarder(FeedConnectionId connectionId, FeedRuntimeId runtimeId, FeedPolicyAccessor policyAccessor,
+            FeedRuntimeInputHandler inputHandler) throws HyracksDataException {
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.policyAccessor = policyAccessor;
+        this.inputHandler = inputHandler;
+        this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
+    }
+
+    public boolean processMessage(ByteBuffer message) {
+        if (policyAccessor.getMaxFractionDiscard() != 0) {
+            long nProcessed = inputHandler.getProcessed();
+            long discardLimit = (long) (nProcessed * maxFractionDiscard);
+            if (nDiscarded >= discardLimit) {
+                return false;
+            }
+            nDiscarded++;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far  ("
+                        + nDiscarded + ") Limit [" + discardLimit + "]");
+            }
+            return true;
+        }
+        return false;
+    }
+
+}



Mime
View raw message