asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [13/24] incubator-asterixdb git commit: Introduces Feeds 2.0
Date Mon, 29 Jun 2015 19:45:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReportService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReportService.java
new file mode 100644
index 0000000..383f869
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReportService.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedService;
+import edu.uci.ics.asterix.common.feeds.message.NodeReportMessage;
+
+public class NodeLoadReportService implements IFeedService {
+
+    private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
+    private static final float CPU_CHANGE_THRESHOLD = 0.2f;
+    private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
+
+    private final String nodeId;
+    private final NodeLoadReportTask task;
+    private final Timer timer;
+
+    public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
+        this.nodeId = nodeId;
+        this.task = new NodeLoadReportTask(nodeId, feedManager);
+        this.timer = new Timer();
+    }
+
+    @Override
+    public void start() throws Exception {
+        timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
+    }
+
+    @Override
+    public void stop() {
+        timer.cancel();
+    }
+
+    private static class NodeLoadReportTask extends TimerTask {
+
+        private final String nodeId;
+        private final IFeedManager feedManager;
+        private final NodeReportMessage message;
+        private final IFeedMessageService messageService;
+
+        private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
+        private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
+
+        public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
+            this.nodeId = nodeId;
+            this.feedManager = feedManager;
+            this.message = new NodeReportMessage(0.0f, 0L, 0);
+            this.messageService = feedManager.getFeedMessageService();
+        }
+
+        @Override
+        public void run() {
+            List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
+            int nRuntimes = runtimeIds.size();
+            double cpuLoad = getCpuLoad();
+            double usedHeap = getUsedHeap();
+            if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
+                message.reset(cpuLoad, usedHeap, nRuntimes);
+                messageService.sendMessage(message);
+            }
+        }
+
+        private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
+            if (message == null) {
+                return true;
+            }
+
+            boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad()) / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
+            boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap()) / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
+            boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
+            return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
+        }
+
+        private double getCpuLoad() {
+            return osBean.getSystemLoadAverage();
+        }
+
+        private double getUsedHeap() {
+            return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/Series.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/Series.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/Series.java
new file mode 100644
index 0000000..780ea03
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/Series.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
+
+public abstract class Series {
+
+    protected final MetricType type;
+    protected int runningSum;
+
+    public Series(MetricType type) {
+        this.type = type;
+    }
+
+    public abstract void addValue(int value);
+
+    public int getRunningSum() {
+        return runningSum;
+    }
+
+    public MetricType getType() {
+        return type;
+    }
+
+    public abstract void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesAvg.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesAvg.java
new file mode 100644
index 0000000..f9e33d6
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesAvg.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
+
+public class SeriesAvg extends Series {
+
+    private int count;
+
+    public SeriesAvg() {
+        super(MetricType.AVG);
+    }
+
+    public int getAvg() {
+        return runningSum / count;
+    }
+
+    public synchronized void addValue(int value) {
+        if (value < 0) {
+            return;
+        }
+        runningSum += value;
+        count++;
+    }
+    
+    public  void reset(){
+        count = 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesRate.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesRate.java
new file mode 100644
index 0000000..8271462
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SeriesRate.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
+
+public class SeriesRate extends Series {
+
+    private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
+
+    private int rate;
+    private Timer timer;
+    private RateComputingTask task;
+
+    public SeriesRate() {
+        super(MetricType.RATE);
+        begin();
+    }
+
+    public int getRate() {
+        return rate;
+    }
+
+    public synchronized void addValue(int value) {
+        if (value < 0) {
+            return;
+        }
+        runningSum += value;
+    }
+
+    public void begin() {
+        if (timer == null) {
+            timer = new Timer();
+            task = new RateComputingTask(this);
+            timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
+        }
+    }
+
+    public void end() {
+        if (timer != null) {
+            timer.cancel();
+        }
+    }
+
+    public void reset() {
+        rate = 0;
+        if (task != null) {
+            task.reset();
+        }
+    }
+
+    private class RateComputingTask extends TimerTask {
+
+        private int lastMeasured = 0;
+        private final SeriesRate series;
+
+        public RateComputingTask(SeriesRate series) {
+            this.series = series;
+        }
+
+        @Override
+        public void run() {
+            int currentValue = series.getRunningSum();
+            rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
+            lastMeasured = currentValue;
+        }
+
+        public void reset() {
+            lastMeasured = 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageFrameHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageFrameHandler.java
new file mode 100644
index 0000000..e3348cf
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageFrameHandler.java
@@ -0,0 +1,100 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class StorageFrameHandler {
+
+    private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
+    private long avgDelayPersistence;
+
+    public StorageFrameHandler() {
+        intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
+        avgDelayPersistence = 0L;
+    }
+
+    public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
+        int nTuples = frameAccessor.getTupleCount();
+        long delay = 0;
+        long intakeTimestamp;
+        long currentTime = System.currentTimeMillis();
+        int partition = 0;
+        int recordId = 0;
+        for (int i = 0; i < nTuples; i++) {
+            int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
+            int openPartOffsetOrig = frame.getInt(recordStart + 6);
+            int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
+
+            int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+                    + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
+            recordId = frame.getInt(recordStart + recordIdOffset);
+
+            int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
+            partition = frame.getInt(recordStart + partitionOffset);
+
+            ackRecordId(partition, recordId);
+            int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
+                    + 1;
+            intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
+
+            int storeTimestampValueOffset = intakeTimestampValueOffset + 8
+                    + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
+            frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
+            delay += currentTime - intakeTimestamp;
+        }
+        avgDelayPersistence = delay / nTuples;
+    }
+
+    private void ackRecordId(int partition, int recordId) {
+        Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
+        if (map == null) {
+            map = new HashMap<Integer, IntakePartitionStatistics>();
+            intakeStatistics.put(partition, map);
+        }
+        int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
+        IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
+        if (intakeStatsForBaseOfPartition == null) {
+            intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
+            map.put(base, intakeStatsForBaseOfPartition);
+        }
+        intakeStatsForBaseOfPartition.ackRecordId(recordId);
+    }
+
+    public byte[] getAckData(int partition, int base) {
+        Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
+        if (intakeStats != null) {
+            IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
+            if (intakePartitionStats != null) {
+                return intakePartitionStats.getAckInfo();
+            }
+        }
+        return null;
+    }
+
+    public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
+        Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
+        Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
+        for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
+            clone.put(entry.getKey(), entry.getValue());
+        }
+        return intakeStatsForPartition;
+    }
+
+    public long getAvgDelayPersistence() {
+        return avgDelayPersistence;
+    }
+
+    public void setAvgDelayPersistence(long avgDelayPersistence) {
+        this.avgDelayPersistence = avgDelayPersistence;
+    }
+
+    public Set<Integer> getPartitionsWithStats() {
+        return intakeStatistics.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageSideMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageSideMonitoredBuffer.java
new file mode 100644
index 0000000..947fb32
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/StorageSideMonitoredBuffer.java
@@ -0,0 +1,189 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class StorageSideMonitoredBuffer extends MonitoredBuffer {
+
+    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
+                                                                      // seconds
+
+    private boolean ackingEnabled;
+    private final boolean timeTrackingEnabled;
+
+    public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta,  RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+        timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
+        ackingEnabled = policyAccessor.atleastOnceSemantics();
+        if (ackingEnabled || timeTrackingEnabled) {
+            storageFromeHandler = new StorageFrameHandler();
+            this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
+                    inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
+                    storageFromeHandler);
+            this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
+        }
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return false;
+    }
+
+    protected boolean logInflowOutflowRate() {
+        return true;
+    }
+
+    @Override
+    public IFramePreprocessor getFramePreProcessor() {
+        return new IFramePreprocessor() {
+
+            @Override
+            public void preProcess(ByteBuffer frame) {
+                try {
+                    if (ackingEnabled) {
+                        storageFromeHandler.updateTrackingInformation(frame, inflowFta);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return new IFramePostProcessor() {
+
+            private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
+            private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
+            private static final long LOW_WINDOW_LIMIT = 1200 * 1000;
+
+            private long delayNormalWindow = 0;
+            private long delayHighWindow = 0;
+            private long delayLowWindow = 0;
+
+            private int countNormalWindow;
+            private int countHighWindow;
+            private int countLowWindow;
+
+            private long beginIntakeTimestamp = 0;
+
+            @Override
+            public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
+                if (ackingEnabled || timeTrackingEnabled) {
+                    int nTuples = frameAccessor.getTupleCount();
+                    long intakeTimestamp;
+                    long currentTime = System.currentTimeMillis();
+                    int partition = 0;
+                    int recordId = 0;
+                    for (int i = 0; i < nTuples; i++) {
+                        int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
+                        int openPartOffsetOrig = frame.getInt(recordStart + 6);
+                        int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
+
+                        int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+                                + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
+                        recordId = frame.getInt(recordStart + recordIdOffset);
+
+                        int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
+                                + 1;
+                        partition = frame.getInt(recordStart + partitionOffset);
+
+                        int intakeTimestampValueOffset = partitionOffset + 4
+                                + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
+                        intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
+                        if (beginIntakeTimestamp == 0) {
+                            beginIntakeTimestamp = intakeTimestamp;
+                            LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
+                        }
+
+                        updateRunningAvg(intakeTimestamp, currentTime);
+
+                        int storeTimestampValueOffset = intakeTimestampValueOffset + 8
+                                + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
+                        frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
+                    }
+                    logRunningAvg();
+                    resetRunningAvg();
+                }
+            }
+
+            private void updateRunningAvg(long intakeTimestamp, long currentTime) {
+                long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
+                long delay = (currentTime - intakeTimestamp);
+                if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
+                    delayNormalWindow += delay;
+                    countNormalWindow++;
+                } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
+                    delayHighWindow += delay;
+                    countHighWindow++;
+                } else {
+                    delayLowWindow += delay;
+                    countLowWindow++;
+                }
+            }
+
+            private void resetRunningAvg() {
+                delayNormalWindow = 0;
+                countNormalWindow = 0;
+                delayHighWindow = 0;
+                countHighWindow = 0;
+                delayLowWindow = 0;
+                countLowWindow = 0;
+            }
+
+            private void logRunningAvg() {
+                if (countNormalWindow != 0 && delayNormalWindow != 0) {
+                    LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
+                }
+                if (countHighWindow != 0 && delayHighWindow != 0) {
+                    LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
+                }
+                if (countLowWindow != 0 && delayLowWindow != 0) {
+                    LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
+                }
+            }
+
+        };
+    }
+
+    public boolean isAckingEnabled() {
+        return ackingEnabled;
+    }
+
+    public void setAcking(boolean ackingEnabled) {
+        this.ackingEnabled = ackingEnabled;
+    }
+
+    public boolean isTimeTrackingEnabled() {
+        return timeTrackingEnabled;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return true;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return true;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableFeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableFeedRuntimeId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableFeedRuntimeId.java
new file mode 100644
index 0000000..db958b3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableFeedRuntimeId.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class SubscribableFeedRuntimeId extends FeedRuntimeId {
+
+    private final FeedId feedId;
+
+    public SubscribableFeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition) {
+        super(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+        this.feedId = feedId;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof SubscribableFeedRuntimeId)) {
+            return false;
+        }
+
+        return (super.equals(o) && this.feedId.equals(((SubscribableFeedRuntimeId) o).getFeedId()));
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode() + feedId.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableRuntime.java
new file mode 100644
index 0000000..7fa2869
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SubscribableRuntime.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.api.ISubscriberRuntime;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
+
+    protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
+
+    protected final FeedId feedId;
+    protected final List<ISubscriberRuntime> subscribers;
+    protected final RecordDescriptor recordDescriptor;
+    protected final DistributeFeedFrameWriter dWriter;
+
+    public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler,
+            DistributeFeedFrameWriter dWriter, RecordDescriptor recordDescriptor) {
+        super(runtimeId, inputHandler, dWriter);
+        this.feedId = feedId;
+        this.recordDescriptor = recordDescriptor;
+        this.dWriter = dWriter;
+        this.subscribers = new ArrayList<ISubscriberRuntime>();
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    @Override
+    public String toString() {
+        return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
+    }
+
+    @Override
+    public synchronized void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime)
+            throws Exception {
+        FeedFrameCollector collector = dWriter.subscribeFeed(new FeedPolicyAccessor(collectionRuntime.getFeedPolicy()),
+                collectionRuntime.getInputHandler(), collectionRuntime.getConnectionId());
+        collectionRuntime.setFrameCollector(collector);
+        subscribers.add(collectionRuntime);
+    }
+
+    @Override
+    public synchronized void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+        dWriter.unsubscribeFeed(collectionRuntime.getFeedFrameWriter());
+        subscribers.remove(collectionRuntime);
+    }
+
+    @Override
+    public synchronized List<ISubscriberRuntime> getSubscribers() {
+        return subscribers;
+    }
+
+    @Override
+    public DistributeFeedFrameWriter getFeedFrameWriter() {
+        return dWriter;
+    }
+
+    public FeedRuntimeType getFeedRuntimeType() {
+        return runtimeId.getFeedRuntimeType();
+    }
+
+    @Override
+    public RecordDescriptor getRecordDescriptor() {
+        return recordDescriptor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SuperFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SuperFeedManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SuperFeedManager.java
deleted file mode 100644
index 37306a0..0000000
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/SuperFeedManager.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.common.feeds;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The feed operators running in an NC report their health (statistics) to the local Feed Manager.
- * A feed thus has a Feed Manager per NC. From amongst the Feed Maanger, a SuperFeedManager is chosen (randomly)
- * The SuperFeedManager collects reports from the FeedMaangers and has the global cluster view in terms of
- * how different feed operators running in a distributed fashion are performing.
- */
-public class SuperFeedManager {
-
-    private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
-
-    /**
-     * IP Address or DNS name of the host where Super Feed Manager is running.
-     */
-    private String host;
-
-    private AtomicInteger availablePort; // starting value is fixed
-
-    /**
-     * The port at which the SuperFeedManager listens for connections by other Feed Managers.
-     */
-    private final int feedReportPort; // fixed
-
-    /**
-     * The port at which the SuperFeedManager listens for connections by clients that wish
-     * to subscribe to the feed health reports.E.g. feed management console.
-     */
-    private final int feedReportSubscribePort; // fixed
-
-    /**
-     * The Id of Node Controller
-     */
-    private final String nodeId;
-
-    /**
-     * A unique identifier for the feed instance. A feed instance represents the flow of data
-     * from a feed to a dataset.
-     **/
-    private final FeedConnectionId feedConnectionId;
-
-    /**
-     * Set to true of the Super Feed Manager is local to the NC.
-     **/
-    private boolean isLocal = false;
-
-    private FeedReportDestinationSocketProvider sfmService;
-
-    private SuperFeedReportSubscriptionService subscriptionService;
-
-    private LinkedBlockingQueue<String> feedReportInbox; ///
-
-    private boolean started = false;
-
-    private final IFeedManager feedManager;
-
-    public static final int PORT_RANGE_ASSIGNED = 10;
-
-    public enum FeedReportMessageType {
-        CONGESTION,
-        THROUGHPUT
-    }
-
-    public SuperFeedManager(FeedConnectionId feedId, String host, String nodeId, int port, IFeedManager feedManager)
-            throws Exception {
-        this.feedConnectionId = feedId;
-        this.feedManager = feedManager;
-        this.nodeId = nodeId;
-        this.feedReportPort = port;
-        this.feedReportSubscribePort = port + 1;
-        this.availablePort = new AtomicInteger(feedReportSubscribePort + 1);
-        this.host = host;
-        this.feedReportInbox = new LinkedBlockingQueue<String>();
-    }
-
-    public int getPort() {
-        return feedReportPort;
-    }
-
-    public String getHost() throws Exception {
-        return host;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public FeedConnectionId getFeedConnectionId() {
-        return feedConnectionId;
-    }
-
-    public boolean isLocal() {
-        return isLocal;
-    }
-
-    public void setLocal(boolean isLocal) {
-        this.isLocal = isLocal;
-    }
-
-    public void start() throws IOException {
-        if (sfmService == null) {
-            ExecutorService executorService = feedManager.getFeedExecutorService(feedConnectionId);
-            sfmService = new FeedReportDestinationSocketProvider(feedReportPort, feedReportInbox, feedConnectionId,
-                    availablePort, feedManager);
-            executorService.execute(sfmService);
-            subscriptionService = new SuperFeedReportSubscriptionService(feedConnectionId, feedReportSubscribePort,
-                    sfmService.getMesgAnalyzer(), availablePort, feedManager);
-            executorService.execute(subscriptionService);
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Started super feed manager! " + this);
-        }
-        started = true;
-    }
-
-    public void stop() throws IOException {
-        sfmService.stop();
-        subscriptionService.stop();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Stopped super feed manager! " + this);
-        }
-        started = false;
-    }
-
-    public boolean isStarted() {
-        return started;
-    }
-
-    @Override
-    public String toString() {
-        return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + feedReportPort + "]"
-                + (isLocal ? started ? "Started " : "Not Started" : " Remote ");
-    }
-
-    public AtomicInteger getAvailablePort() {
-        return availablePort;
-    }
-
-    private static class SuperFeedReportSubscriptionService implements Runnable {
-
-        private final FeedConnectionId feedId;
-        private ServerSocket serverFeedSubscribe;
-        private AtomicInteger subscriptionPort;
-        private boolean active = true;
-        private String EOM = "\n";
-        private final FeedReportProvider reportProvider;
-        private final List<FeedDataProviderService> dataProviders = new ArrayList<FeedDataProviderService>();
-        private final IFeedManager feedManager;
-
-        public SuperFeedReportSubscriptionService(FeedConnectionId feedId, int port, FeedReportProvider reportProvider,
-                AtomicInteger nextPort, IFeedManager feedManager) throws IOException {
-            this.feedId = feedId;
-            serverFeedSubscribe = feedManager.getFeedRuntimeManager(feedId).createServerSocket(port);
-            this.subscriptionPort = nextPort;
-            this.reportProvider = reportProvider;
-            this.feedManager = feedManager;
-        }
-
-        public void stop() {
-            active = false;
-            for (FeedDataProviderService dataProviderService : dataProviders) {
-                dataProviderService.stop();
-            }
-        }
-
-        @Override
-        public void run() {
-            while (active) {
-                try {
-                    Socket client = serverFeedSubscribe.accept();
-                    OutputStream os = client.getOutputStream();
-                    int port = subscriptionPort.incrementAndGet();
-                    LinkedBlockingQueue<String> reportInbox = new LinkedBlockingQueue<String>();
-                    reportProvider.registerSubsription(reportInbox);
-                    FeedDataProviderService dataProviderService = new FeedDataProviderService(feedId, port,
-                            reportInbox, feedManager);
-                    dataProviders.add(dataProviderService);
-                    feedManager.getFeedRuntimeManager(feedId).getExecutorService().execute(dataProviderService);
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Recevied subscription request for feed :" + feedId
-                                + " Subscripton available at port " + subscriptionPort);
-                    }
-                    os.write((port + EOM).getBytes());
-                    os.flush();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private static class FeedDataProviderService implements Runnable {
-
-        private final FeedConnectionId feedId;
-        private final IFeedManager feedManager;
-        private int subscriptionPort;
-        private ServerSocket dataProviderSocket;
-        private LinkedBlockingQueue<String> inbox;
-        private boolean active = true;
-        private String EOM = "\n";
-
-        public FeedDataProviderService(FeedConnectionId feedId, int port, LinkedBlockingQueue<String> inbox,
-                IFeedManager feedManager) throws IOException {
-            this.feedId = feedId;
-            this.subscriptionPort = port;
-            this.inbox = inbox;
-            dataProviderSocket = feedManager.getFeedRuntimeManager(feedId).createServerSocket(port);
-            this.feedManager = feedManager;
-        }
-
-        @Override
-        public void run() {
-            try {
-                Socket client = dataProviderSocket.accept();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Subscriber to " + feedId + " data connected");
-                }
-                OutputStream os = client.getOutputStream();
-                while (active) {
-                    String message = inbox.take();
-                    os.write((message + EOM).getBytes());
-                }
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Unsubscribed from " + feedId + " disconnected");
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-        public void stop() {
-            active = false;
-        }
-
-        @Override
-        public String toString() {
-            return "DATA_PROVIDER_" + feedId + "[" + subscriptionPort + "]";
-        }
-
-    }
-
-    private static class FeedReportDestinationSocketProvider implements Runnable {
-
-        private static final String EOM = "\n";
-
-        private AtomicInteger nextPort;
-        private final ServerSocket feedReportSocket;
-        private final LinkedBlockingQueue<String> inbox;
-        private final List<MessageListener> messageListeners;
-        private final FeedReportProvider mesgAnalyzer;
-        private final FeedConnectionId feedId;
-        private boolean process = true;
-
-        public FeedReportDestinationSocketProvider(int port, LinkedBlockingQueue<String> inbox,
-                FeedConnectionId feedId, AtomicInteger availablePort, IFeedManager feedManager) throws IOException {
-            FeedRuntimeManager runtimeManager = feedManager.getFeedRuntimeManager(feedId);
-            this.feedReportSocket = runtimeManager.createServerSocket(port);
-            this.nextPort = availablePort;
-            this.inbox = inbox;
-            this.feedId = feedId;
-            this.messageListeners = new ArrayList<MessageListener>();
-            this.mesgAnalyzer = new FeedReportProvider(inbox, feedId);
-            feedManager.getFeedExecutorService(feedId).execute(mesgAnalyzer);
-        }
-
-        public void stop() {
-            process = false;
-            if (feedReportSocket != null) {
-                try {
-                    feedReportSocket.close();
-                    process = false;
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-            for (MessageListener listener : messageListeners) {
-                listener.stop();
-            }
-            mesgAnalyzer.stop();
-        }
-
-        @Override
-        public void run() {
-            Socket client = null;
-            while (process) {
-                try {
-                    client = feedReportSocket.accept();
-                    int port = nextPort.incrementAndGet();
-                    /**
-                     * MessageListener provides the functionality of listening at a port for messages
-                     * and delivering each received message to an input queue (inbox).
-                     */
-                    MessageListener listener = new MessageListener(port, inbox);
-                    listener.start();
-                    synchronized (messageListeners) {
-                        messageListeners.add(listener);
-                    }
-                    OutputStream os = client.getOutputStream();
-                    os.write((port + EOM).getBytes());
-                    os.flush();
-                } catch (IOException e) {
-                    if (process == false) {
-                        break;
-                    }
-                } finally {
-                    if (client != null) {
-                        try {
-                            client.close();
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }
-        }
-
-        public FeedReportProvider getMesgAnalyzer() {
-            return mesgAnalyzer;
-        }
-
-    }
-
-    /**
-     * The report messages sent by the feed operators are sent to the FeedReportProvider.
-     * FeedReportMessageAnalyzer is responsible for distributing the messages to the subscribers.
-     * The Feed Management Console is an example of a subscriber.
-     */
-    private static class FeedReportProvider implements Runnable {
-
-        private final LinkedBlockingQueue<String> inbox;
-        private final FeedConnectionId feedId;
-        private boolean process = true;
-        private final List<LinkedBlockingQueue<String>> subscriptionQueues;
-        private final Map<String, String> ingestionThroughputs;
-
-        public FeedReportProvider(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
-                throws UnknownHostException, IOException {
-            this.inbox = inbox;
-            this.feedId = feedId;
-            this.subscriptionQueues = new ArrayList<LinkedBlockingQueue<String>>();
-            this.ingestionThroughputs = new HashMap<String, String>();
-        }
-
-        public void stop() {
-            process = false;
-        }
-
-        public void registerSubsription(LinkedBlockingQueue<String> subscriptionQueue) {
-            subscriptionQueues.add(subscriptionQueue);
-        }
-
-        public void deregisterSubsription(LinkedBlockingQueue<String> subscriptionQueue) {
-            subscriptionQueues.remove(subscriptionQueue);
-        }
-
-        public void run() {
-            StringBuilder finalMessage = new StringBuilder();
-            FeedReport report = new FeedReport();
-            while (process) {
-                try {
-                    String message = inbox.take();
-                    report.reset(message);
-                    FeedReportMessageType mesgType = report.getReportType();
-                    switch (mesgType) {
-                        case THROUGHPUT:
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.warning("Feed Health Report " + message);
-                            }
-                            String[] msgComponents = message.split("\\|");
-                            String partition = msgComponents[3];
-                            String tput = msgComponents[4];
-                            String timestamp = msgComponents[6];
-
-                            boolean dispatchReport = true;
-                            if (ingestionThroughputs.get(partition) == null) {
-                                ingestionThroughputs.put(partition, tput);
-                                dispatchReport = false;
-                            } else {
-                                for (int i = 0; i < ingestionThroughputs.size(); i++) {
-                                    String tp = ingestionThroughputs.get(i + "");
-                                    if (tp != null) {
-                                        ingestionThroughputs.put(i + "", null);
-                                        finalMessage.append(tp + FeedMessageService.MessageSeparator);
-                                    } else {
-                                        dispatchReport = false;
-                                        break;
-                                    }
-                                }
-                                ingestionThroughputs.put(partition, tput);
-                            }
-
-                            if (dispatchReport) {
-                                String dispatchedReport = finalMessage.toString();
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    LOGGER.info("Dispatched report " + dispatchedReport);
-                                }
-                                for (LinkedBlockingQueue<String> q : subscriptionQueues) {
-                                    q.add(dispatchedReport);
-                                }
-                            }
-                            finalMessage.delete(0, finalMessage.length());
-                            break;
-                        case CONGESTION:
-                            // congestionInbox.add(report);
-                            break;
-                    }
-                } catch (InterruptedException e) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Unable to process messages " + e.getMessage() + " for feed " + feedId);
-                    }
-                }
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IAdapterRuntimeManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IAdapterRuntimeManager.java
new file mode 100644
index 0000000..cb84646
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IAdapterRuntimeManager.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public interface IAdapterRuntimeManager {
+
+    public enum State {
+        /**
+         * Indicates that AsterixDB is maintaining the flow of data from external source into its storage.
+         */
+        ACTIVE_INGESTION,
+
+        /**
+         * Indicates that data from external source is being buffered and not
+         * pushed downstream
+         */
+
+        INACTIVE_INGESTION,
+        /**
+         * Indicates that feed ingestion activity has finished.
+         */
+        FINISHED_INGESTION,
+
+        /** Indicates the occurrence of a failure during the intake stage of a data ingestion pipeline **/
+        FAILED_INGESTION
+    }
+
+    /**
+     * Start feed ingestion
+     * 
+     * @throws Exception
+     */
+    public void start() throws Exception;
+
+    /**
+     * Stop feed ingestion.
+     * 
+     * @throws Exception
+     */
+    public void stop() throws Exception;
+
+    /**
+     * @return feedId associated with the feed that is being ingested
+     */
+    public FeedId getFeedId();
+
+    /**
+     * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
+     */
+    public IFeedAdapter getFeedAdapter();
+
+    /**
+     * @return state associated with the AdapterRuntimeManager. See {@code State}.
+     */
+    public State getState();
+
+    /**
+     * @param state
+     */
+    public void setState(State state);
+
+    public IIntakeProgressTracker getProgressTracker();
+
+    public int getPartition();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ICentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ICentralFeedManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ICentralFeedManager.java
new file mode 100644
index 0000000..b42335f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ICentralFeedManager.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface ICentralFeedManager {
+
+    public void start() throws AsterixException;
+
+    public void stop() throws AsterixException, IOException;
+
+    public IFeedTrackingManager getFeedTrackingManager();
+
+    public IFeedLoadManager getFeedLoadManager();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IDatasourceAdapter.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IDatasourceAdapter.java
new file mode 100644
index 0000000..36ce7cf
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IDatasourceAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+/**
+ * A super interface implemented by a data source adapter. An adapter can be a
+ * pull based or push based. This interface provides all common APIs that need
+ * to be implemented by each adapter irrespective of the the kind of
+ * adapter(pull or push).
+ */
+public interface IDatasourceAdapter extends Serializable {
+
+    /**
+     * Triggers the adapter to begin ingesting data from the external source.
+     * 
+     * @param partition
+     *            The adapter could be running with a degree of parallelism.
+     *            partition corresponds to the i'th parallel instance.
+     * @param writer
+     *            The instance of frame writer that is used by the adapter to
+     *            write frame to. Adapter packs the fetched bytes (from external source),
+     *            packs them into frames and forwards the frames to an upstream receiving
+     *            operator using the instance of IFrameWriter.
+     * @throws Exception
+     */
+    public void start(int partition, IFrameWriter writer) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IExceptionHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IExceptionHandler.java
new file mode 100644
index 0000000..4e7e4e3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IExceptionHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Handles an exception encountered during processing of a data frame.
+ * In the case when the exception is of type {@code FrameDataException}, the causing
+ * tuple is logged and a new frame with tuple after the exception-generating tuple
+ * is returned. This funcitonality is used during feed ingestion to bypass an exception
+ * generating tuple and thus avoid the data flow from terminating
+ */
+public interface IExceptionHandler {
+
+    /**
+     * @param e
+     *            the exception that needs to be handled
+     * @param frame
+     *            the frame that was being processed when exception occurred
+     * @return returns a new frame with tuples after the exception generating tuple
+     * @throws HyracksDataException
+     */
+    public ByteBuffer handleException(Exception e, ByteBuffer frame);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedAdapter.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedAdapter.java
new file mode 100644
index 0000000..bd2f6d3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedAdapter.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Interface implemented by a feed adapter.
+ */
+public interface IFeedAdapter extends IDatasourceAdapter {
+
+    public enum DataExchangeMode {
+        /**
+         * PULL model requires the adaptor to make a separate request each time to receive data
+         **/
+        PULL,
+
+        /**
+         * PUSH mode involves the use o just one initial request (handshake) by the adaptor
+         * to the datasource for setting up the connection and providing any protocol-specific
+         * parameters. Once a connection is established, the data source "pushes" data to the adaptor.
+         **/
+        PUSH
+    }
+
+    /**
+     * Returns the data exchange mode (PULL/PUSH) associated with the flow.
+     * 
+     * @return
+     */
+    public DataExchangeMode getDataExchangeMode();
+
+    /**
+     * Discontinue the ingestion of data and end the feed.
+     * 
+     * @throws Exception
+     */
+    public void stop() throws Exception;
+
+    /**
+     * @param e
+     * @return true if the feed ingestion should continue post the exception else false
+     */
+    public boolean handleException(Exception e);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedConnectionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedConnectionManager.java
new file mode 100644
index 0000000..0f91b52
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedConnectionManager.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.io.IOException;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
+
+/**
+ * Handle (de)registration of feeds for delivery of control messages.
+ */
+public interface IFeedConnectionManager {
+
+    /**
+     * Allows registration of a feedRuntime.
+     * 
+     * @param feedRuntime
+     * @throws Exception
+     */
+    public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) throws Exception;
+
+    /**
+     * Obtain feed runtime corresponding to a feedRuntimeId
+     * 
+     * @param feedRuntimeId
+     * @return
+     */
+    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
+
+    /**
+     * De-register a feed
+     * 
+     * @param feedConnection
+     * @throws IOException
+     */
+    void deregisterFeed(FeedConnectionId feedConnection);
+
+    /**
+     * Obtain the feed runtime manager associated with a feed.
+     * 
+     * @param feedConnection
+     * @return
+     */
+    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedConnection);
+
+    /**
+     * Allows de-registration of a feed runtime.
+     * 
+     * @param feedRuntimeId
+     */
+    void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
+
+    public List<FeedRuntimeId> getRegisteredRuntimes();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedFrameHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedFrameHandler.java
new file mode 100644
index 0000000..057416a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedFrameHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import edu.uci.ics.asterix.common.feeds.DataBucket;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFeedFrameHandler {
+
+    public void handleFrame(ByteBuffer frame) throws HyracksDataException;
+
+    public void handleDataBucket(DataBucket bucket);
+
+    public void close();
+
+    public Iterator<ByteBuffer> replayData() throws HyracksDataException;
+
+    public String getSummary();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedJoint.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedJoint.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedJoint.java
new file mode 100644
index 0000000..0c52a42
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedJoint.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+
+public interface IFeedJoint {
+
+    public enum FeedJointType {
+        /** Feed Joint is located at the intake stage of a primary feed **/
+        INTAKE,
+
+        /** Feed Joint is located at the compute stage of a primary/secondary feed **/
+        COMPUTE
+    }
+
+    public enum State {
+        /** Initial state of a feed joint post creation but prior to scheduling of corresponding Hyracks job. **/
+        CREATED,
+
+        /** State acquired post creation of Hyracks job and known physical locations of the joint **/
+        INITIALIZED,
+
+        /** State acquired post starting of Hyracks job at which point, data begins to flow through the joint **/
+        ACTIVE
+    }
+
+    /**
+     * @return the {@link State} associated with the FeedJoint
+     */
+    public State getState();
+
+    /**
+     * @return the {@link FeedJointType} associated with the FeedJoint
+     */
+    public FeedJointType getType();
+
+    /**
+     * @return the list of data receivers that are
+     *         receiving the data flowing through this FeedJoint
+     */
+    public List<FeedConnectionId> getReceivers();
+
+    /**
+     * @return the list of pending subscription request {@link FeedConnectionRequest} submitted for data flowing through the FeedJoint
+     */
+    public List<FeedConnectionRequest> getConnectionRequests();
+
+    /**
+     * @return the subscription location {@link ConnectionLocation} associated with the FeedJoint
+     */
+    public ConnectionLocation getConnectionLocation();
+
+    /**
+     * @return the unique {@link FeedJointKey} associated with the FeedJoint
+     */
+    public FeedJointKey getFeedJointKey();
+
+    /**
+     * Returns the feed subscriber {@link FeedSubscriber} corresponding to a given feed connection id.
+     * 
+     * @param feedConnectionId
+     *            the unique id of a feed connection
+     * @return an instance of feedConnectionId {@link FeedConnectionId}
+     */
+    public FeedConnectionId getReceiver(FeedConnectionId feedConnectionId);
+
+    /**
+     * @param active
+     */
+    public void setState(State active);
+
+    /**
+     * Remove the subscriber from the set of registered subscribers to the FeedJoint
+     * 
+     * @param connectionId
+     *            the connectionId that needs to be removed
+     */
+    public void removeReceiver(FeedConnectionId connectionId);
+
+    public FeedId getOwnerFeedId();
+
+    /**
+     * Add a feed connectionId to the set of registered subscribers
+     * 
+     * @param connectionId
+     */
+    public void addReceiver(FeedConnectionId connectionId);
+
+    /**
+     * Add a feed subscription request {@link FeedConnectionRequest} for the FeedJoint
+     * 
+     * @param request
+     */
+    public void addConnectionRequest(FeedConnectionRequest request);
+
+    public FeedConnectionId getProvider();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java
new file mode 100644
index 0000000..84deec9
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleEventSubscriber.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+
+public interface IFeedLifecycleEventSubscriber {
+
+    public enum FeedLifecycleEvent {
+        FEED_INTAKE_STARTED,
+        FEED_COLLECT_STARTED,
+        FEED_INTAKE_FAILURE,
+        FEED_COLLECT_FAILURE,
+        FEED_ENDED
+    }
+
+    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;
+
+    public void handleFeedEvent(FeedLifecycleEvent event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleIntakeEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleIntakeEventSubscriber.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleIntakeEventSubscriber.java
new file mode 100644
index 0000000..f857156
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleIntakeEventSubscriber.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+
+public interface IFeedLifecycleIntakeEventSubscriber extends IFeedLifecycleEventSubscriber {
+
+    public void handleFeedEvent(FeedIntakeInfo iInfo, FeedLifecycleEvent event) throws AsterixException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleListener.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleListener.java
new file mode 100644
index 0000000..fb755c9
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLifecycleListener.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+
+public interface IFeedLifecycleListener extends IJobLifecycleListener, IClusterEventsSubscriber {
+
+    public enum ConnectionLocation {
+        SOURCE_FEED_INTAKE_STAGE,
+        SOURCE_FEED_COMPUTE_STAGE
+    }
+
+    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJoinKey);
+
+    public boolean isFeedJointAvailable(FeedJointKey feedJoinKey);
+
+    public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId);
+
+    public List<String> getComputeLocations(FeedId feedId);
+
+    public List<String> getIntakeLocations(FeedId feedId);
+
+    public List<String> getStoreLocations(FeedConnectionId feedId);
+
+    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber);
+
+    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber);
+
+    public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
+
+    boolean isFeedConnectionActive(FeedConnectionId connectionId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLoadManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLoadManager.java
new file mode 100644
index 0000000..bf9bbd0
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedLoadManager.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.json.JSONException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+
+public interface IFeedLoadManager {
+
+    public void submitNodeLoadReport(NodeLoadReport report);
+
+    public void reportCongestion(FeedCongestionMessage message) throws JSONException, AsterixException;
+
+    public void submitFeedRuntimeReport(FeedReportMessage message);
+
+    public void submitScaleInPossibleReport(ScaleInReportMessage sm) throws AsterixException, Exception;
+
+    public List<String> getNodes(int required);
+
+    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception;
+
+    int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType);
+
+    void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity);
+
+    void removeFeedActivity(FeedConnectionId connectionId);
+    
+    public FeedActivity getFeedActivity(FeedConnectionId connectionId);
+
+    public Collection<FeedActivity> getFeedActivities();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedManager.java
new file mode 100644
index 0000000..a9368ae
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedManager.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+
+/**
+ * Provides access to services related to feed management within a node controller
+ */
+public interface IFeedManager {
+
+    /**
+     * gets the nodeId associated with the host node controller
+     * 
+     * @return the nodeId associated with the host node controller
+     */
+    public String getNodeId();
+
+    /**
+     * gets the handle to the singleton instance of subscription manager
+     * 
+     * @return the singleton instance of subscription manager
+     * @see IFeedSubscriptionManager
+     */
+    public IFeedSubscriptionManager getFeedSubscriptionManager();
+
+    /**
+     * gets the handle to the singleton instance of connection manager
+     * 
+     * @return the singleton instance of connection manager
+     * @see IFeedConnectionManager
+     */
+    public IFeedConnectionManager getFeedConnectionManager();
+
+    /**
+     * gets the handle to the singleton instance of memory manager
+     * 
+     * @return the singleton instance of memory manager
+     * @see IFeedMemoryManager
+     */
+    public IFeedMemoryManager getFeedMemoryManager();
+
+    /**
+     * gets the handle to the singleton instance of feed metadata manager
+     * 
+     * @return the singleton instance of feed metadata manager
+     * @see IFeedMetadataManager
+     */
+    public IFeedMetadataManager getFeedMetadataManager();
+
+    /**
+     * gets the handle to the singleton instance of feed metric collector
+     * 
+     * @return the singleton instance of feed metric collector
+     * @see IFeedMetricCollector
+     */
+    public IFeedMetricCollector getFeedMetricCollector();
+
+    /**
+     * gets the handle to the singleton instance of feed message service
+     * 
+     * @return the singleton instance of feed message service
+     * @see IFeedMessageService
+     */
+    public IFeedMessageService getFeedMessageService();
+
+    /**
+     * gets the asterix configuration
+     * 
+     * @return asterix configuration
+     * @see AsterixFeedProperties
+     */
+    public AsterixFeedProperties getAsterixFeedProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryComponent.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryComponent.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryComponent.java
new file mode 100644
index 0000000..da23ce9
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryComponent.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Represents an in-memory components required for storing frames that contain feed tuples.
+ * The component's memory footprint is measured and regulated by the {@link IFeedMemoryManager}.
+ * Any expansion in size is accounted and can be restricted by the {@link IFeedMemoryManager}
+ **/
+public interface IFeedMemoryComponent {
+
+    public enum Type {
+
+        /** A pool of reusable frames **/
+        POOL,
+
+        /** An ordered list of frames **/
+        COLLECTION
+    }
+
+    /** Gets the unique id associated with the memory component **/
+    public int getComponentId();
+
+    /** Gets the type associated with the component. **/
+    public Type getType();
+
+    /** Gets the current size (number of allocated frames) of the component. **/
+    public int getTotalAllocation();
+
+    /**
+     * Expands this memory component by the speficied number of frames
+     * 
+     * @param delta
+     *            the amount (measured in number of frames) by which this memory component
+     *            should be expanded
+     */
+    public void expand(int delta);
+
+    /** Clears the allocated frames as a step to reclaim the memory **/
+    public void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryManager.java
new file mode 100644
index 0000000..8b3322e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMemoryManager.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent.Type;
+
+/**
+ * Provides management of memory allocated for handling feed data flow through the node controller
+ */
+public interface IFeedMemoryManager {
+
+    public static final int START_COLLECTION_SIZE = 20;
+    public static final int START_POOL_SIZE = 10;
+
+    /**
+     * Gets a memory component allocated from the feed memory budget
+     * 
+     * @param type
+     *            the kind of memory component that needs to be allocated
+     * @return
+     * @see Type
+     */
+    public IFeedMemoryComponent getMemoryComponent(Type type);
+
+    /**
+     * Expand a memory component by the default increment
+     * 
+     * @param memoryComponent
+     * @return true if the expansion succeeded
+     *         false if the requested expansion violates the configured budget
+     */
+    public boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent);
+
+    /**
+     * Releases the given memory component to reclaim the memory allocated for the component
+     * 
+     * @param memoryComponent
+     *            the memory component that is being reclaimed/released
+     */
+    public void releaseMemoryComponent(IFeedMemoryComponent memoryComponent);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessage.java
new file mode 100644
index 0000000..290b63e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.JSONSerializable;
+
+/**
+ * A control message exchanged between {@Link IFeedManager} and {@Link CentralFeedManager} that requests for an action or reporting of an event
+ */
+public interface IFeedMessage extends Serializable, JSONSerializable {
+
+    public enum MessageType {
+        END,
+        XAQL,
+        FEED_REPORT,
+        NODE_REPORT,
+        STORAGE_REPORT,
+        CONGESTION,
+        PREPARE_STALL,
+        TERMINATE_FLOW,
+        SCALE_IN_REQUEST,
+        COMMIT_ACK,
+        COMMIT_ACK_RESPONSE,
+        THROTTLING_ENABLED
+    }
+
+    /**
+     * Gets the type associated with this message
+     * 
+     * @return MessageType type associated with this message
+     */
+    public MessageType getMessageType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessageService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessageService.java
new file mode 100644
index 0000000..9086099
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMessageService.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Provides the functionality of sending a meesage ({@code IFeedMessage} to the {@code CentralFeedManager}
+ */
+public interface IFeedMessageService extends IFeedService {
+
+    /**
+     * Sends a message ({@code IFeedMessage} to the {@code CentralFeedManager} running at the CC
+     * The message is sent asynchronously.
+     * 
+     * @param message
+     *            the message to be sent
+     */
+    public void sendMessage(IFeedMessage message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetadataManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetadataManager.java
new file mode 100644
index 0000000..fc0dd4e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetadataManager.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+
+public interface IFeedMetadataManager {
+
+    /**
+     * @param feedConnectionId
+     *            connection id corresponding to the feed connection
+     * @param tuple
+     *            the erroneous tuple that raised an exception
+     * @param message
+     *            the message corresponding to the exception being raised
+     * @param feedManager
+     * @throws AsterixException
+     */
+    public void logTuple(FeedConnectionId feedConnectionId, String tuple, String message, IFeedManager feedManager)
+            throws AsterixException;
+
+}


Mime
View raw message