asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [17/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
Date Thu, 14 Jan 2016 20:32:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/StorageReportFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/StorageReportFeedMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/StorageReportFeedMessage.java
deleted file mode 100644
index 6f2c102..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/StorageReportFeedMessage.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedConstants.MessageConstants;
-import org.apache.asterix.common.feeds.FeedId;
-
-/**
- * A feed control message sent from a storage runtime of a feed pipeline to report the intake timestamp corresponding
- * to the last persisted tuple.
- */
-public class StorageReportFeedMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final int partition;
-    private long lastPersistedTupleIntakeTimestamp;
-    private boolean persistenceDelayWithinLimit;
-    private long averageDelay;
-    private int intakePartition;
-
-    public StorageReportFeedMessage(FeedConnectionId connectionId, int partition,
-            long lastPersistedTupleIntakeTimestamp, boolean persistenceDelayWithinLimit, long averageDelay,
-            int intakePartition) {
-        super(MessageType.STORAGE_REPORT);
-        this.connectionId = connectionId;
-        this.partition = partition;
-        this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
-        this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
-        this.averageDelay = averageDelay;
-        this.intakePartition = intakePartition;
-    }
-
-    @Override
-    public String toString() {
-        return messageType.name() + " " + connectionId + " [" + lastPersistedTupleIntakeTimestamp + "] ";
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public long getLastPersistedTupleIntakeTimestamp() {
-        return lastPersistedTupleIntakeTimestamp;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public boolean isPersistenceDelayWithinLimit() {
-        return persistenceDelayWithinLimit;
-    }
-
-    public void setPersistenceDelayWithinLimit(boolean persistenceDelayWithinLimit) {
-        this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
-    }
-
-    public long getAverageDelay() {
-        return averageDelay;
-    }
-
-    public void setAverageDelay(long averageDelay) {
-        this.averageDelay = averageDelay;
-    }
-
-    public int getIntakePartition() {
-        return intakePartition;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP, lastPersistedTupleIntakeTimestamp);
-        obj.put(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT, persistenceDelayWithinLimit);
-        obj.put(MessageConstants.AVERAGE_PERSISTENCE_DELAY, averageDelay);
-        obj.put(FeedConstants.MessageConstants.PARTITION, partition);
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-
-        return obj;
-    }
-
-    public static StorageReportFeedMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int partition = obj.getInt(FeedConstants.MessageConstants.PARTITION);
-        long timestamp = obj.getLong(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP);
-        boolean persistenceDelayWithinLimit = obj.getBoolean(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT);
-        long averageDelay = obj.getLong(MessageConstants.AVERAGE_PERSISTENCE_DELAY);
-        int intakePartition = obj.getInt(MessageConstants.INTAKE_PARTITION);
-        return new StorageReportFeedMessage(connectionId, partition, timestamp, persistenceDelayWithinLimit,
-                averageDelay, intakePartition);
-    }
-
-    public void reset(long lastPersistedTupleIntakeTimestamp, boolean delayWithinLimit, long averageDelay) {
-        this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
-        this.persistenceDelayWithinLimit = delayWithinLimit;
-        this.averageDelay = averageDelay;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
deleted file mode 100644
index 0ddd8ba..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator involved in the feed pipeline.
- */
-public class ThrottlingEnabledFeedMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-
-    private final FeedRuntimeId runtimeId;
-
-    public ThrottlingEnabledFeedMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId) {
-        super(MessageType.THROTTLING_ENABLED);
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-    }
-
-    @Override
-    public String toString() {
-        return MessageType.END.name() + "  " + connectionId + " [" + runtimeId + "] ";
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
-        obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
-        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
-        return obj;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public FeedRuntimeId getFeedRuntimeId() {
-        return runtimeId;
-    }
-
-    public static ThrottlingEnabledFeedMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
-                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
-                obj.getInt(FeedConstants.MessageConstants.PARTITION),
-                obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
-        return new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 5ee065a..27f4fcb 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -32,10 +32,11 @@ public interface ITupleForwarder {
     public enum TupleForwardPolicy {
         FRAME_FULL,
         COUNTER_TIMER_EXPIRED,
-        RATE_CONTROLLED
+        RATE_CONTROLLED,
+        FEED
     }
 
-    public void configure(Map<String, String> configuration);
+    public void configure(Map<String, String> configuration) throws HyracksDataException;
 
     public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
new file mode 100644
index 0000000..acfb9d5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.File;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class StoragePathUtil {
+    public static final String PARTITION_DIR_PREFIX = "partition_";
+    public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
+    public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+    public static final String ADAPTER_INSTANCE_PREFIX = "adapter_";
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+            FileSplit[] splits) {
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        String[] loc = new String[splits.length];
+        for (int p = 0; p < splits.length; p++) {
+            loc[p] = splits[p].getNodeName();
+        }
+        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
+        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+    }
+
+    public static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
+        return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
+                partition.getPartitionId());
+    }
+
+    public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
+        return storageDirName + File.separator + StoragePathUtil.PARTITION_DIR_PREFIX + partitonId;
+    }
+
+    public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
+        return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 2e7158d..e957ac6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -21,10 +21,10 @@ package org.apache.asterix.external.adapter.factory;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 9539278..bf2db9a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -21,7 +21,6 @@ package org.apache.asterix.external.api;
 import java.io.Serializable;
 import java.util.Map;
 
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
new file mode 100644
index 0000000..252b43b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.management.FeedId;
+
+public interface IAdapterRuntimeManager {
+
+    public enum State {
+        /**
+         * Indicates that AsterixDB is maintaining the flow of data from external source into its storage.
+         */
+        ACTIVE_INGESTION,
+
+        /**
+         * Indicates that data from external source is being buffered and not
+         * pushed downstream
+         */
+
+        INACTIVE_INGESTION,
+        /**
+         * Indicates that feed ingestion activity has finished.
+         */
+        FINISHED_INGESTION,
+
+        /** Indicates the occurrence of a failure during the intake stage of a data ingestion pipeline **/
+        FAILED_INGESTION
+    }
+
+    /**
+     * Start feed ingestion
+     * @throws Exception
+     */
+    public void start() throws Exception;
+
+    /**
+     * Stop feed ingestion.
+     * @throws Exception
+     */
+    public void stop() throws Exception;
+
+    /**
+     * @return feedId associated with the feed that is being ingested
+     */
+    public FeedId getFeedId();
+
+    /**
+     * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
+     */
+    public IFeedAdapter getFeedAdapter();
+
+    /**
+     * @return state associated with the AdapterRuntimeManager. See {@code State}.
+     */
+    public State getState();
+
+    /**
+     * @param state
+     */
+    public void setState(State state);
+
+    public IIntakeProgressTracker getProgressTracker();
+
+    public int getPartition();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index f5f47ec..e4435a1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.api;
 
-import java.io.IOException;
 import java.util.Map;
 
 import org.apache.asterix.common.parse.ITupleForwarder;
@@ -39,11 +38,21 @@ public interface IDataFlowController {
      * 3. setTupleForwarder(forwarder)
      * 4. configure(configuration,ctx)
      * 5. start(writer)
+     *
+     * pause(), resume(), and stop() are only used with feeds
+     * pause is called after start when a feed is running and the system is overwhelmed with data.
+     * resume is called after the load goes down and we are ready to receive more data.
+     * stop is called to disconnect the feed. once stop is called, no other method is called.
+     *
      */
 
     public void start(IFrameWriter writer) throws HyracksDataException;
 
-    public boolean stop();
+    public boolean stop() throws HyracksDataException;
+
+    public boolean pause() throws HyracksDataException;
+
+    public boolean resume() throws HyracksDataException;
 
     public boolean handleException(Throwable th);
 
@@ -51,5 +60,5 @@ public interface IDataFlowController {
 
     public void setTupleForwarder(ITupleForwarder forwarder);
 
-    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws IOException;
+    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
new file mode 100644
index 0000000..e37f2b1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+/**
+ * A super interface implemented by a data source adapter. An adapter can be a
+ * pull based or push based. This interface provides all common APIs that need
+ * to be implemented by each adapter irrespective of the the kind of
+ * adapter(pull or push).
+ */
+public interface IDataSourceAdapter extends Serializable {
+
+    public enum AdapterType {
+        INTERNAL,
+        EXTERNAL
+    }
+
+    /**
+     * Triggers the adapter to begin ingesting data from the external source.
+     * @param partition
+     *            The adapter could be running with a degree of parallelism.
+     *            partition corresponds to the i'th parallel instance.
+     * @param writer
+     *            The instance of frame writer that is used by the adapter to
+     *            write frame to. Adapter packs the fetched bytes (from external source),
+     *            packs them into frames and forwards the frames to an upstream receiving
+     *            operator using the instance of IFrameWriter.
+     * @throws Exception
+     */
+    public void start(int partition, IFrameWriter writer) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
new file mode 100644
index 0000000..3261556
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFeedAdapter extends IDataSourceAdapter {
+    /**
+     * Pause the ingestion of data.
+     * @throws HyracksDataException
+     * @throws Exception
+     */
+    public boolean pause() throws HyracksDataException;
+
+    /**
+     * Resume the ingestion of data.
+     * @throws HyracksDataException
+     * @throws Exception
+     */
+    public boolean resume() throws HyracksDataException;
+
+    /**
+     * Discontinue the ingestion of data.
+     * @throws Exception
+     */
+    public boolean stop() throws Exception;
+
+    /**
+     * @param e
+     * @return true if the ingestion should continue post the exception else false
+     * @throws Exception
+     */
+    public boolean handleException(Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
index 31d6317..531d050 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.api;
 
 import java.io.DataOutput;
+import java.io.IOException;
 import java.io.InputStream;
 
 public interface IStreamDataParser extends IDataParser {
@@ -30,10 +31,17 @@ public interface IStreamDataParser extends IDataParser {
     /**
      * Parse data into output AsterixDataModel binary records.
      * Used with parsers that support stream sources
-     *
      * @param out
      *            DataOutput instance that for writing the parser output.
      */
-
     public boolean parse(DataOutput out) throws Exception;
+
+    /**
+     * reset the parser state. this is called when a failure takes place
+     * and the job needs to continue and to do that, the parser need to
+     * be in a consistent state
+     * @return true if reset was successful, false, otherwise
+     * @throws IOException
+     */
+    public boolean reset(InputStream in) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index d06161e..c5c8e48 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.io.IOException;
 import java.util.Map;
 
 import org.apache.asterix.common.parse.ITupleForwarder;
@@ -48,7 +47,7 @@ public abstract class AbstractDataFlowController implements IDataFlowController
     }
 
     @Override
-    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws IOException {
+    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) {
         this.configuration = configuration;
         this.ctx = ctx;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
new file mode 100644
index 0000000..aab4bf6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractFeedDataFlowController implements IDataFlowController {
+    protected FeedTupleForwarder tupleForwarder;
+    protected IHyracksTaskContext ctx;
+    protected Map<String, String> configuration;
+    protected static final int NUMBER_OF_TUPLE_FIELDS = 1;
+    protected ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
+
+    @Override
+    public ITupleForwarder getTupleForwarder() {
+        return tupleForwarder;
+    }
+
+    @Override
+    public void setTupleForwarder(ITupleForwarder tupleForwarder) {
+        this.tupleForwarder = (FeedTupleForwarder) tupleForwarder;
+    }
+
+    protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
+        tupleForwarder.initialize(ctx, writer);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) {
+        this.configuration = configuration;
+        this.ctx = ctx;
+    }
+
+    @Override
+    public boolean pause() {
+        tupleForwarder.pause();
+        return true;
+    }
+
+    @Override
+    public boolean resume() {
+        tupleForwarder.resume();
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
new file mode 100644
index 0000000..fe4557d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordFlowController;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController
+        implements IRecordFlowController<T> {
+    protected IRecordDataParser<T> dataParser;
+    protected IRecordReader<? extends T> recordReader;
+    protected long interval;
+    protected AtomicBoolean closed = new AtomicBoolean(false);
+
+    @Override
+    public void start(IFrameWriter writer) throws HyracksDataException {
+        HyracksDataException hde = null;
+        try {
+            initializeTupleForwarder(writer);
+            while (recordReader.hasNext()) {
+                IRawRecord<? extends T> record = recordReader.next();
+                if (record == null) {
+                    Thread.sleep(interval);
+                    continue;
+                }
+                tb.reset();
+                dataParser.parse(record, tb.getDataOutput());
+                tb.addFieldEndOffset();
+                tupleForwarder.addTuple(tb);
+            }
+        } catch (Throwable th) {
+            hde = new HyracksDataException(th);
+        }
+        try {
+            tupleForwarder.close();
+        } catch (Throwable th) {
+            hde = ExternalDataExceptionUtils.suppress(hde, th);
+        }
+        try {
+            recordReader.close();
+        } catch (Throwable th) {
+            hde = ExternalDataExceptionUtils.suppress(hde, th);
+            throw hde;
+        } finally {
+            closeSignal();
+        }
+    }
+
+    private void closeSignal() {
+        synchronized (closed) {
+            closed.set(true);
+            closed.notifyAll();
+        }
+    }
+
+    private void waitForSignal() throws InterruptedException {
+        synchronized (closed) {
+            while (!closed.get()) {
+                closed.wait();
+            }
+        }
+    }
+
+    @Override
+    public boolean stop() throws HyracksDataException {
+        if (recordReader.stop()) {
+            try {
+                waitForSignal();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return true;
+    }
+
+    @Override
+    public void setRecordParser(IRecordDataParser<T> dataParser) {
+        this.dataParser = dataParser;
+    }
+
+    @Override
+    public void setRecordReader(IRecordReader<T> recordReader) {
+        this.recordReader = recordReader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
new file mode 100644
index 0000000..4ef5f6d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedStreamDataFlowController extends AbstractFeedDataFlowController implements IStreamFlowController {
+
+    private IStreamDataParser dataParser;
+    private AInputStream stream;
+
+    @Override
+    public void start(IFrameWriter writer) throws HyracksDataException {
+        try {
+            initializeTupleForwarder(writer);
+            while (true) {
+                tb.reset();
+                if (!dataParser.parse(tb.getDataOutput())) {
+                    break;
+                }
+                tb.addFieldEndOffset();
+                tupleForwarder.addTuple(tb);
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            tupleForwarder.close();
+        }
+    }
+
+    @Override
+    public boolean stop() throws HyracksDataException {
+        try {
+            if (stream.stop()) {
+                return true;
+            }
+            stream.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        boolean handled = true;
+        try {
+            handled &= stream.skipError();
+            if (handled) {
+                handled &= dataParser.reset(stream);
+            }
+        } catch (Exception e) {
+            th.addSuppressed(e);
+            return false;
+        }
+        return handled;
+    }
+
+    @Override
+    public void setStreamParser(IStreamDataParser dataParser) {
+        this.dataParser = dataParser;
+    }
+
+    public void setStream(AInputStream stream) {
+        this.stream = stream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
new file mode 100644
index 0000000..d170766
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FeedTupleForwarder implements ITupleForwarder {
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+    private boolean paused = false;
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+    }
+
+    @Override
+    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        this.appender = new FrameTupleAppender(frame);
+    }
+
+    @Override
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (paused) {
+            synchronized (this) {
+                while (paused) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+        }
+        DataflowUtils.addTupleToFrame(appender, tb, writer);
+    }
+
+    public void pause() {
+        paused = true;
+    }
+
+    public synchronized void resume() {
+        paused = false;
+        notifyAll();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index ad8e791..9353a40 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -57,7 +57,7 @@ public class RecordDataFlowController<T> extends AbstractDataFlowController impl
 
     @Override
     public boolean stop() {
-        return false;
+        return recordReader.stop();
     }
 
     @Override
@@ -74,4 +74,14 @@ public class RecordDataFlowController<T> extends AbstractDataFlowController impl
     public void setRecordReader(IRecordReader<T> recordReader) throws Exception {
         this.recordReader = recordReader;
     }
+
+    @Override
+    public boolean pause() throws HyracksDataException {
+        return false;
+    }
+
+    @Override
+    public boolean resume() throws HyracksDataException {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 3016470..43738eb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -61,4 +61,14 @@ public class StreamDataFlowController extends AbstractDataFlowController impleme
     public void setStreamParser(IStreamDataParser dataParser) {
         this.dataParser = dataParser;
     }
+
+    @Override
+    public boolean pause() throws HyracksDataException {
+        return false;
+    }
+
+    @Override
+    public boolean resume() throws HyracksDataException {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AdapterIdentifier.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AdapterIdentifier.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AdapterIdentifier.java
new file mode 100644
index 0000000..6c02500
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/AdapterIdentifier.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a data source adapter.
+ */
+public class AdapterIdentifier implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String dataverseName;
+    private final String adapterName;
+
+    public AdapterIdentifier(String namespace, String name) {
+        this.dataverseName = namespace;
+        this.adapterName = name;
+    }
+
+    public String getNamespace() {
+        return dataverseName;
+    }
+
+    public String getName() {
+        return adapterName;
+    }
+
+    @Override
+    public int hashCode() {
+        return (dataverseName + "@" + adapterName).hashCode();
+
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null) {
+            return false;
+        }
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof AdapterIdentifier)) {
+            return false;
+        }
+        AdapterIdentifier a = (AdapterIdentifier) o;
+        return dataverseName.equals(a.getNamespace()) && adapterName.equals(a.getName());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 74e98dd..d19eedf 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -18,11 +18,12 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class GenericAdapter implements IDataSourceAdapter {
+public class GenericAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
     private final IDataFlowController controller;
@@ -32,12 +33,12 @@ public class GenericAdapter implements IDataSourceAdapter {
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
+    public void start(int partition, IFrameWriter writer) throws HyracksDataException {
         controller.start(writer);
     }
 
     @Override
-    public boolean stop() throws Exception {
+    public boolean stop() throws HyracksDataException {
         return controller.stop();
     }
 
@@ -45,4 +46,14 @@ public class GenericAdapter implements IDataSourceAdapter {
     public boolean handleException(Throwable e) {
         return controller.handleException(e);
     }
+
+    @Override
+    public boolean pause() throws HyracksDataException {
+        return controller.pause();
+    }
+
+    @Override
+    public boolean resume() throws HyracksDataException {
+        return controller.resume();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
deleted file mode 100644
index 3f10dc4..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataset.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public abstract class StreamBasedAdapter implements IDataSourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapter.class.getName());
-
-    public abstract InputStream getInputStream(int partition) throws IOException;
-
-    protected final ITupleParser tupleParser;
-
-    protected final IAType sourceDatatype;
-
-    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx,
-            int partition) throws HyracksDataException {
-        this.tupleParser = parserFactory.createTupleParser(ctx);
-        this.sourceDatatype = sourceDatatype;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        InputStream in = getInputStream(partition);
-        if (in != null) {
-            tupleParser.parse(in, writer);
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning(
-                        "Could not obtain input stream for parsing from adapter " + this + "[" + partition + "]");
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java
new file mode 100644
index 0000000..4f0ed77
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+
+public interface ICentralFeedManager {
+
+    public void start() throws AsterixException;
+
+    public void stop() throws AsterixException, IOException;
+
+    public IFeedTrackingManager getFeedTrackingManager();
+
+    public IFeedLoadManager getFeedLoadManager();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java
new file mode 100644
index 0000000..ec0af1c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Handles an exception encountered during processing of a data frame.
+ * In the case when the exception is of type {@code FrameDataException}, the causing
+ * tuple is logged and a new frame with tuple after the exception-generating tuple
+ * is returned. This funcitonality is used during feed ingestion to bypass an exception
+ * generating tuple and thus avoid the data flow from terminating
+ */
+public interface IExceptionHandler {
+
+    /**
+     * @param e
+     *            the exception that needs to be handled
+     * @param frame
+     *            the frame that was being processed when exception occurred
+     * @return returns a new frame with tuples after the exception generating tuple
+     * @throws HyracksDataException
+     */
+    public ByteBuffer handleException(Exception e, ByteBuffer frame);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
new file mode 100644
index 0000000..6865522
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.management.FeedId;
+
+public interface IFeed extends Serializable {
+
+    public enum FeedType {
+        /**
+         * A feed that derives its data from an external source.
+         */
+        PRIMARY,
+
+        /**
+         * A feed that derives its data from another primary or secondary feed.
+         */
+        SECONDARY
+    }
+
+    public FeedType getFeedType();
+
+    public FunctionSignature getAppliedFunction();
+
+    public String getFeedName();
+
+    public String getDataverseName();
+
+    public FeedId getFeedId();
+
+    public Map<String, String> getAdapterConfiguration();
+
+    public String getAdapterName();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
new file mode 100644
index 0000000..35d4cd7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedRuntimeManager;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+/**
+ * Handle (de)registration of feeds for delivery of control messages.
+ */
+public interface IFeedConnectionManager {
+
+    /**
+     * Allows registration of a feedRuntime.
+     * 
+     * @param feedRuntime
+     * @throws Exception
+     */
+    public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) throws Exception;
+
+    /**
+     * Obtain feed runtime corresponding to a feedRuntimeId
+     * 
+     * @param feedRuntimeId
+     * @return
+     */
+    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
+
+    /**
+     * De-register a feed
+     * 
+     * @param feedConnection
+     * @throws IOException
+     */
+    void deregisterFeed(FeedConnectionId feedConnection);
+
+    /**
+     * Obtain the feed runtime manager associated with a feed.
+     * 
+     * @param feedConnection
+     * @return
+     */
+    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedConnection);
+
+    /**
+     * Allows de-registration of a feed runtime.
+     * 
+     * @param feedRuntimeId
+     */
+    void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
+
+    public List<FeedRuntimeId> getRegisteredRuntimes();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java
new file mode 100644
index 0000000..9cb0feb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.asterix.external.feed.dataflow.DataBucket;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFeedFrameHandler {
+
+    public void handleFrame(ByteBuffer frame) throws HyracksDataException;
+
+    public void handleDataBucket(DataBucket bucket);
+
+    public void close();
+
+    public Iterator<ByteBuffer> replayData() throws HyracksDataException;
+
+    public String getSummary();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
new file mode 100644
index 0000000..d990e45
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.util.List;
+
+import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedJointKey;
+
+public interface IFeedJoint {
+
+    public enum FeedJointType {
+        /** Feed Joint is located at the intake stage of a primary feed **/
+        INTAKE,
+
+        /** Feed Joint is located at the compute stage of a primary/secondary feed **/
+        COMPUTE
+    }
+
+    public enum State {
+        /** Initial state of a feed joint post creation but prior to scheduling of corresponding Hyracks job. **/
+        CREATED,
+
+        /** State acquired post creation of Hyracks job and known physical locations of the joint **/
+        INITIALIZED,
+
+        /** State acquired post starting of Hyracks job at which point, data begins to flow through the joint **/
+        ACTIVE
+    }
+
+    /**
+     * @return the {@link State} associated with the FeedJoint
+     */
+    public State getState();
+
+    /**
+     * @return the {@link FeedJointType} associated with the FeedJoint
+     */
+    public FeedJointType getType();
+
+    /**
+     * @return the list of data receivers that are
+     *         receiving the data flowing through this FeedJoint
+     */
+    public List<FeedConnectionId> getReceivers();
+
+    /**
+     * @return the list of pending subscription request {@link FeedConnectionRequest} submitted for data flowing through the FeedJoint
+     */
+    public List<FeedConnectionRequest> getConnectionRequests();
+
+    /**
+     * @return the subscription location {@link ConnectionLocation} associated with the FeedJoint
+     */
+    public ConnectionLocation getConnectionLocation();
+
+    /**
+     * @return the unique {@link FeedJointKey} associated with the FeedJoint
+     */
+    public FeedJointKey getFeedJointKey();
+
+    /**
+     * Returns the feed subscriber {@link FeedSubscriber} corresponding to a given feed connection id.
+     * 
+     * @param feedConnectionId
+     *            the unique id of a feed connection
+     * @return an instance of feedConnectionId {@link FeedConnectionId}
+     */
+    public FeedConnectionId getReceiver(FeedConnectionId feedConnectionId);
+
+    /**
+     * @param active
+     */
+    public void setState(State active);
+
+    /**
+     * Remove the subscriber from the set of registered subscribers to the FeedJoint
+     * 
+     * @param connectionId
+     *            the connectionId that needs to be removed
+     */
+    public void removeReceiver(FeedConnectionId connectionId);
+
+    public FeedId getOwnerFeedId();
+
+    /**
+     * Add a feed connectionId to the set of registered subscribers
+     * 
+     * @param connectionId
+     */
+    public void addReceiver(FeedConnectionId connectionId);
+
+    /**
+     * Add a feed subscription request {@link FeedConnectionRequest} for the FeedJoint
+     * 
+     * @param request
+     */
+    public void addConnectionRequest(FeedConnectionRequest request);
+
+    public FeedConnectionId getProvider();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
new file mode 100644
index 0000000..0c8724e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+
+public interface IFeedLifecycleEventSubscriber {
+
+    public enum FeedLifecycleEvent {
+        FEED_INTAKE_STARTED,
+        FEED_COLLECT_STARTED,
+        FEED_INTAKE_FAILURE,
+        FEED_COLLECT_FAILURE,
+        FEED_ENDED
+    }
+
+    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;
+
+    public void handleFeedEvent(FeedLifecycleEvent event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java
new file mode 100644
index 0000000..b9caa0d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+
+public interface IFeedLifecycleIntakeEventSubscriber extends IFeedLifecycleEventSubscriber {
+
+    public void handleFeedEvent(FeedIntakeInfo iInfo, FeedLifecycleEvent event) throws AsterixException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
new file mode 100644
index 0000000..ce82aaf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedJointKey;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+
+public interface IFeedLifecycleListener extends IJobLifecycleListener, IClusterEventsSubscriber {
+
+    public enum ConnectionLocation {
+        SOURCE_FEED_INTAKE_STAGE,
+        SOURCE_FEED_COMPUTE_STAGE
+    }
+
+    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJoinKey);
+
+    public boolean isFeedJointAvailable(FeedJointKey feedJoinKey);
+
+    public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId);
+
+    public List<String> getComputeLocations(FeedId feedId);
+
+    public List<String> getIntakeLocations(FeedId feedId);
+
+    public List<String> getStoreLocations(FeedConnectionId feedId);
+
+    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber);
+
+    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber);
+
+    public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
+
+    boolean isFeedConnectionActive(FeedConnectionId connectionId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java
new file mode 100644
index 0000000..f511979
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.json.JSONException;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedCongestionMessage;
+import org.apache.asterix.external.feed.message.FeedReportMessage;
+import org.apache.asterix.external.feed.message.ScaleInReportMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.feed.watch.NodeLoadReport;
+
+public interface IFeedLoadManager {
+
+    public void submitNodeLoadReport(NodeLoadReport report);
+
+    public void reportCongestion(FeedCongestionMessage message) throws JSONException, AsterixException;
+
+    public void submitFeedRuntimeReport(FeedReportMessage message);
+
+    public void submitScaleInPossibleReport(ScaleInReportMessage sm) throws AsterixException, Exception;
+
+    public List<String> getNodes(int required);
+
+    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception;
+
+    int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType);
+
+    void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity);
+
+    void removeFeedActivity(FeedConnectionId connectionId);
+    
+    public FeedActivity getFeedActivity(FeedConnectionId connectionId);
+
+    public Collection<FeedActivity> getFeedActivities();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java
new file mode 100644
index 0000000..b3ad0a5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+
+/**
+ * Provides access to services related to feed management within a node controller
+ */
+public interface IFeedManager {
+
+    /**
+     * gets the handle to the singleton instance of subscription manager
+     * @return the singleton instance of subscription manager
+     * @see IFeedSubscriptionManager
+     */
+    public IFeedSubscriptionManager getFeedSubscriptionManager();
+
+    /**
+     * gets the handle to the singleton instance of connection manager
+     * @return the singleton instance of connection manager
+     * @see IFeedConnectionManager
+     */
+    public IFeedConnectionManager getFeedConnectionManager();
+
+    /**
+     * gets the handle to the singleton instance of memory manager
+     * @return the singleton instance of memory manager
+     * @see IFeedMemoryManager
+     */
+    public IFeedMemoryManager getFeedMemoryManager();
+
+    /**
+     * gets the handle to the singleton instance of feed metadata manager
+     * @return the singleton instance of feed metadata manager
+     * @see IFeedMetadataManager
+     */
+    public IFeedMetadataManager getFeedMetadataManager();
+
+    /**
+     * gets the handle to the singleton instance of feed metric collector
+     * @return the singleton instance of feed metric collector
+     * @see IFeedMetricCollector
+     */
+    public IFeedMetricCollector getFeedMetricCollector();
+
+    /**
+     * gets the handle to the singleton instance of feed message service
+     * @return the singleton instance of feed message service
+     * @see IFeedMessageService
+     */
+    public IFeedMessageService getFeedMessageService();
+
+    public AsterixFeedProperties getAsterixFeedProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java
new file mode 100644
index 0000000..313dc1b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+/**
+ * Represents an in-memory components required for storing frames that contain feed tuples.
+ * The component's memory footprint is measured and regulated by the {@link IFeedMemoryManager}.
+ * Any expansion in size is accounted and can be restricted by the {@link IFeedMemoryManager}
+ **/
+public interface IFeedMemoryComponent {
+
+    public enum Type {
+
+        /** A pool of reusable frames **/
+        POOL,
+
+        /** An ordered list of frames **/
+        COLLECTION
+    }
+
+    /** Gets the unique id associated with the memory component **/
+    public int getComponentId();
+
+    /** Gets the type associated with the component. **/
+    public Type getType();
+
+    /** Gets the current size (number of allocated frames) of the component. **/
+    public int getTotalAllocation();
+
+    /**
+     * Expands this memory component by the speficied number of frames
+     * 
+     * @param delta
+     *            the amount (measured in number of frames) by which this memory component
+     *            should be expanded
+     */
+    public void expand(int delta);
+
+    /** Clears the allocated frames as a step to reclaim the memory **/
+    public void reset();
+
+}



Mime
View raw message