asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [8/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
Date Sun, 15 May 2016 19:04:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index 2f338d0..a807515 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -21,8 +21,6 @@ package org.apache.asterix.app.external;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
@@ -31,7 +29,6 @@ import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.feed.api.IFeedWork;
 import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
@@ -39,7 +36,8 @@ import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
 /**
  * A collection of feed management related task, each represented as an implementation of {@code IFeedWork}.
@@ -81,7 +79,8 @@ public class FeedWorkCollection {
             @Override
             public void run() {
                 try {
-                    PrintWriter writer = new PrintWriter(System.out, true);
+                    //TODO(amoudi): route PrintWriter to log file
+                    PrintWriter writer = new PrintWriter(System.err, true);
                     SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
                     DataverseDecl dataverseDecl = new DataverseDecl(
                             new Identifier(request.getReceivingFeedId().getDataverse()));
@@ -92,12 +91,12 @@ public class FeedWorkCollection {
                     QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
                     translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
                             QueryTranslator.ResultDelivery.SYNC);
-                    if (LOGGER.isLoggable(Level.INFO)) {
+                    if (LOGGER.isEnabledFor(Level.INFO)) {
                         LOGGER.info("Submitted connection requests for execution: " + request);
                     }
                 } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Exception in executing " + request);
+                    if (LOGGER.isEnabledFor(Level.FATAL)) {
+                        LOGGER.fatal("Exception in executing " + request);
                     }
                 }
             }
@@ -107,8 +106,8 @@ public class FeedWorkCollection {
 
             @Override
             public void workFailed(IFeedWork work, Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn(" Feed subscription request " + ((SubscribeFeedWork) work).request
                             + " failed with exception " + e);
                 }
             }
@@ -116,8 +115,8 @@ public class FeedWorkCollection {
             @Override
             public void workCompleted(IFeedWork work) {
                 ((SubscribeFeedWork) work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed ");
+                if (LOGGER.isEnabledFor(Level.INFO)) {
+                    LOGGER.info(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed ");
                 }
             }
 
@@ -131,75 +130,5 @@ public class FeedWorkCollection {
         public String toString() {
             return "SubscribeFeedWork for [" + request + "]";
         }
-
-    }
-
-    /**
-     * The task of activating a set of feeds.
-     */
-    public static class ActivateFeedWork implements IFeedWork {
-
-        private final Runnable runnable;
-
-        @Override
-        public Runnable getRunnable() {
-            return runnable;
-        }
-
-        public ActivateFeedWork(List<FeedCollectInfo> feedsToRevive) {
-            this.runnable = new FeedsActivateRunnable(feedsToRevive);
-        }
-
-        public ActivateFeedWork() {
-            this.runnable = new FeedsActivateRunnable();
-        }
-
-        private static class FeedsActivateRunnable implements Runnable {
-
-            private List<FeedCollectInfo> feedsToRevive;
-            private Mode mode;
-
-            public enum Mode {
-                REVIVAL_POST_NODE_REJOIN
-            }
-
-            public FeedsActivateRunnable(List<FeedCollectInfo> feedsToRevive) {
-                this.feedsToRevive = feedsToRevive;
-            }
-
-            public FeedsActivateRunnable() {
-            }
-
-            @Override
-            public void run() {
-                switch (mode) {
-                    case REVIVAL_POST_NODE_REJOIN:
-                        try {
-                            Thread.sleep(10000);
-                        } catch (InterruptedException e1) {
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Attempt to resume feed interrupted");
-                            }
-                            throw new IllegalStateException(e1.getMessage());
-                        }
-                        for (FeedCollectInfo finfo : feedsToRevive) {
-                            try {
-                                JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
-                                    LOGGER.info("Job:" + finfo.jobSpec);
-                                }
-                            } catch (Exception e) {
-                                if (LOGGER.isLoggable(Level.WARNING)) {
-                                    LOGGER.warning(
-                                            "Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
-                                }
-                            }
-                        }
-                }
-            }
-
-        }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkRequestResponseHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkRequestResponseHandler.java
deleted file mode 100644
index 2dc1162..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkRequestResponseHandler.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedWorkRequestResponseHandler implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
-
-    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
-
-    private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
-
-    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            IClusterManagementWorkResponse response = null;
-            try {
-                response = inbox.take();
-            } catch (InterruptedException e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Interrupted exception " + e.getMessage());
-                }
-            }
-            IClusterManagementWork submittedWork = response.getWork();
-            Map<String, String> nodeSubstitution = new HashMap<String, String>();
-            switch (submittedWork.getClusterManagementWorkType()) {
-                case ADD_NODE:
-                    AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
-                    int workId = addNodeWork.getWorkId();
-                    Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
-                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
-                    List<String> nodesAdded = resp.getNodesAdded();
-                    List<String> unsubstitutedNodes = new ArrayList<String>();
-                    unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
-                    int nodeIndex = 0;
-
-                    /** form a mapping between the failed node and its substitute **/
-                    if (nodesAdded != null && nodesAdded.size() > 0) {
-                        for (String failedNodeId : addNodeWork.getDeadNodes()) {
-                            String substitute = nodesAdded.get(nodeIndex);
-                            nodeSubstitution.put(failedNodeId, substitute);
-                            nodeIndex = (nodeIndex + 1) % nodesAdded.size();
-                            unsubstitutedNodes.remove(failedNodeId);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
-                            }
-                        }
-                    }
-                    if (unsubstitutedNodes.size() > 0) {
-                        String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes()
-                                .toArray(new String[] {});
-                        nodeIndex = 0;
-                        for (String unsubstitutedNode : unsubstitutedNodes) {
-                            nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
-                                        + unsubstitutedNode);
-                            }
-                            nodeIndex = (nodeIndex + 1) % participantNodes.length;
-                        }
-
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
-                        }
-                    }
-
-                    // alter failed feed intake jobs
-
-                    for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
-                        String failedNode = entry.getKey();
-                        List<FeedJobInfo> impactedJobInfos = entry.getValue();
-                        for (FeedJobInfo info : impactedJobInfos) {
-                            JobSpecification spec = info.getSpec();
-                            replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
-                            info.setSpec(spec);
-                        }
-                    }
-
-                    Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
-                    Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
-
-                    for (List<FeedJobInfo> infos : failureAnalysis.values()) {
-                        for (FeedJobInfo info : infos) {
-                            switch (info.getJobType()) {
-                                case INTAKE:
-                                    revisedIntakeJobs.add((FeedIntakeInfo) info);
-                                    break;
-                                case FEED_CONNECT:
-                                    revisedConnectJobInfos.add((FeedConnectJobInfo) info);
-                                    break;
-                            }
-                        }
-                    }
-
-                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-                    try {
-                        for (FeedIntakeInfo info : revisedIntakeJobs) {
-                            hcc.startJob(info.getSpec());
-                        }
-                        Thread.sleep(2000);
-                        for (FeedConnectJobInfo info : revisedConnectJobInfos) {
-                            hcc.startJob(info.getSpec());
-                            Thread.sleep(2000);
-                        }
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to start revised job post failure");
-                        }
-                    }
-
-                    break;
-                case REMOVE_NODE:
-                    throw new IllegalStateException("Invalid work submitted");
-            }
-        }
-    }
-
-    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
-        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
-        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
-        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
-        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
-        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
-        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
-        OperatorDescriptorId opId = null;
-        for (Constraint constraint : userConstraints) {
-            LValueConstraintExpression lexpr = constraint.getLValue();
-            ConstraintExpression cexpr = constraint.getRValue();
-            switch (lexpr.getTag()) {
-                case PARTITION_COUNT:
-                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    if (modifiedOperators.contains(opId)) {
-                        countConstraintsToReplace.add(constraint);
-                    } else {
-                        List<Constraint> clist = candidateConstraints.get(opId);
-                        if (clist == null) {
-                            clist = new ArrayList<Constraint>();
-                            candidateConstraints.put(opId, clist);
-                        }
-                        clist.add(constraint);
-                    }
-                    break;
-                case PARTITION_LOCATION:
-                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                    if (oldLocation.equals(failedNodeId)) {
-                        locationConstraintsToReplace.add(constraint);
-                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
-                        Map<Integer, String> newLocs = newConstraints.get(opId);
-                        if (newLocs == null) {
-                            newLocs = new HashMap<Integer, String>();
-                            newConstraints.put(opId, newLocs);
-                        }
-                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                        newLocs.put(partition, replacementNode);
-                    } else {
-                        if (modifiedOperators.contains(opId)) {
-                            locationConstraintsToReplace.add(constraint);
-                            Map<Integer, String> newLocs = newConstraints.get(opId);
-                            if (newLocs == null) {
-                                newLocs = new HashMap<Integer, String>();
-                                newConstraints.put(opId, newLocs);
-                            }
-                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                            newLocs.put(partition, oldLocation);
-                        } else {
-                            List<Constraint> clist = candidateConstraints.get(opId);
-                            if (clist == null) {
-                                clist = new ArrayList<Constraint>();
-                                candidateConstraints.put(opId, clist);
-                            }
-                            clist.add(constraint);
-                        }
-                    }
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
-        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
-
-        for (OperatorDescriptorId mopId : modifiedOperators) {
-            List<Constraint> clist = candidateConstraints.get(mopId);
-            if (clist != null && !clist.isEmpty()) {
-                jobSpec.getUserConstraints().removeAll(clist);
-
-                for (Constraint c : clist) {
-                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
-                        ConstraintExpression cexpr = c.getRValue();
-                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
-                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                        newConstraints.get(mopId).put(partition, oldLocation);
-                    }
-                }
-            }
-        }
-
-        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
-            OperatorDescriptorId nopId = entry.getKey();
-            Map<Integer, String> clist = entry.getValue();
-            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
-            String[] locations = new String[clist.size()];
-            for (int i = 0; i < locations.length; i++) {
-                locations[i] = clist.get(i);
-            }
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
-        }
-
-    }
-
-    public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
-        feedsWaitingForResponse.put(workId, impactedJobs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedsActivator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedsActivator.java
deleted file mode 100644
index 5a6d28e..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedsActivator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedsActivator implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    private List<FeedCollectInfo> feedsToRevive;
-    private Mode mode;
-
-    public enum Mode {
-        REVIVAL_POST_CLUSTER_REBOOT,
-        REVIVAL_POST_NODE_REJOIN
-    }
-
-    public FeedsActivator() {
-        this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
-    }
-
-    public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
-        this.feedsToRevive = feedsToRevive;
-        this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
-    }
-
-    @Override
-    public void run() {
-        switch (mode) {
-            case REVIVAL_POST_CLUSTER_REBOOT:
-                //revivePostClusterReboot();
-                break;
-            case REVIVAL_POST_NODE_REJOIN:
-                try {
-                    Thread.sleep(10000);
-                } catch (InterruptedException e1) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Attempt to resume feed interrupted");
-                    }
-                    throw new IllegalStateException(e1.getMessage());
-                }
-                for (FeedCollectInfo finfo : feedsToRevive) {
-                    try {
-                        JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
-                            LOGGER.info("Job:" + finfo.jobSpec);
-                        }
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
-                        }
-                    }
-                }
-        }
-    }
-
-    public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
-        PrintWriter writer = new PrintWriter(System.out, true);
-        SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-        try {
-            DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
-            ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
-                    new Identifier(dataset), feedPolicy, 0);
-            stmt.setForceConnect(true);
-            List<Statement> statements = new ArrayList<Statement>();
-            statements.add(dataverseDecl);
-            statements.add(stmt);
-            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                    QueryTranslator.ResultDelivery.SYNC);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
-                        + feedPolicy + " Exception " + e.getMessage());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 132c8c9..7a7aa40 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -40,7 +40,6 @@ import java.util.logging.Logger;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.app.external.ExternalIndexingOperations;
 import org.apache.asterix.app.external.FeedJoint;
 import org.apache.asterix.app.external.FeedLifecycleListener;
@@ -268,8 +267,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             }
             validateOperation(activeDefaultDataverse, stmt);
             rewriteStatement(stmt); // Rewrite the statement's AST.
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
-                    CentralFeedManager.getInstance());
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
             metadataProvider.setWriterFactory(writerFactory);
             metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
             metadataProvider.setOutputFile(outputFile);
@@ -2366,18 +2364,12 @@ public class QueryTranslator extends AbstractLangTranslator {
                 throw new AsterixException(
                         "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName);
             }
-
             Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations
                     .buildDisconnectFeedJobSpec(metadataProvider, connectionId);
             JobSpecification jobSpec = specDisconnectType.first;
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             JobUtils.runJob(hcc, jobSpec, true);
-
-            if (!specDisconnectType.second) {
-                CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
-                FeedLifecycleListener.INSTANCE.reportPartialDisconnection(connectionId);
-            }
             eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2418,9 +2410,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         String dataset = feedConnectionId.getDatasetName();
         MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset,
                 dataverse + "." + feedConnectionId.getFeedId().getFeedName());
-
         try {
-
             JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
                     bfs.getSubscriptionRequest().getPolicyParameters());
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index e683ef4..4c42594 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -34,7 +34,6 @@ import org.apache.asterix.api.http.servlet.QueryStatusAPIServlet;
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
 import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
-import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.external.FeedLifecycleListener;
 import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -45,7 +44,6 @@ import org.apache.asterix.common.utils.ServletUtil.Servlets;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.event.service.ILookupService;
-import org.apache.asterix.external.feed.api.ICentralFeedManager;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -73,7 +71,6 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
     private Server webServer;
     private Server jsonAPIServer;
     private Server feedServer;
-    private ICentralFeedManager centralFeedManager;
 
     private static IAsterixStateProxy proxy;
     private ICCApplicationContext appCtx;
@@ -110,11 +107,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
         setupFeedServer(externalProperties);
         feedServer.start();
-
         ExternalLibraryUtils.setUpExternaLibraries(false);
-        centralFeedManager = CentralFeedManager.getInstance();
-        centralFeedManager.start();
-
         ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
 
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 4514fee..26f4bc8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -78,9 +78,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
         for (IClusterEventsSubscriber sub : subscribers) {
             Set<IClusterManagementWork> workRequest = sub.notifyNodeJoin(nodeId);
-            if (workRequest != null && !workRequest.isEmpty()) {
-                work.addAll(workRequest);
-            }
+            work.addAll(workRequest);
         }
         if (!work.isEmpty()) {
             executeWorkSet(work);
@@ -104,9 +102,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
         for (IClusterEventsSubscriber sub : subscribers) {
             Set<IClusterManagementWork> workRequest = sub.notifyNodeFailure(deadNodeIds);
-            if (workRequest != null && !workRequest.isEmpty()) {
-                work.addAll(workRequest);
-            }
+            work.addAll(workRequest);
         }
         if (!work.isEmpty()) {
             executeWorkSet(work);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
deleted file mode 100644
index a6be075..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.hyracks.bootstrap;
-
-import org.apache.asterix.app.external.CentralFeedManager;
-import org.apache.asterix.common.config.MetadataConstants;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-
-public class FeedBootstrap {
-
-    public static void setUpInitialArtifacts() throws Exception {
-
-        StringBuilder builder = new StringBuilder();
-        try {
-            builder.append("create dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
-            builder.append("use dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
-            builder.append("create type " + FeedConstants.FAILED_TUPLE_DATASET_TYPE + " as open { ");
-            String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
-                    "timestamp" };
-            IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
-
-            for (int i = 0; i < fieldNames.length; i++) {
-                if (i > 0) {
-                    builder.append(",");
-                }
-                builder.append(fieldNames[i] + ":");
-                builder.append(fieldTypes[i].getTypeName());
-            }
-            builder.append("}" + ";" + "\n");
-            builder.append("create dataset " + FeedConstants.FAILED_TUPLE_DATASET + " " + "("
-                    + FeedConstants.FAILED_TUPLE_DATASET_TYPE + ")" + " " + "primary key "
-                    + FeedConstants.FAILED_TUPLE_DATASET_KEY + " on  " + MetadataConstants.METADATA_NODEGROUP_NAME
-                    + ";");
-
-            CentralFeedManager.AQLExecutor.executeAQL(builder.toString());
-        } catch (Exception e) {
-            e.printStackTrace();
-            System.out.println("Error: " + builder.toString());
-            throw e;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 8132d4b..af8aa31 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.app.external.ExternalIndexingOperations;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -62,13 +62,13 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
     public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
         state = AsterixClusterProperties.INSTANCE.getState();
         AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
-        return null;
+        return Collections.emptySet();
     }
 
     @Override
     public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
         startGlobalRecovery();
-        return null;
+        return Collections.emptySet();
     }
 
     private void executeHyracksJob(JobSpecification spec) throws Exception {
@@ -106,8 +106,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
                         List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
                         for (Dataverse dataverse : dataverses) {
                             if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
-                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse,
-                                        CentralFeedManager.getInstance());
+                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse);
                                 List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
                                         dataverse.getDataverseName());
                                 for (Dataset dataset : datasets) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
index 023de30..2051a35 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -36,7 +36,6 @@ import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -172,7 +171,7 @@ public class ConnectorAPIServletTest {
     private ARecordType getMetadataRecordType(String dataverseName, String datasetName) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         // Retrieves file splits of the dataset.
-        AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null, CentralFeedManager.getInstance());
+        AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null);
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
         ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index c040e9d..f77b38b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -28,6 +28,28 @@
              ResultOffsetPath="results"
              QueryOffsetPath="queries"
              QueryFileExtension=".aql">
+  <test-group name="external-library">
+    <test-case FilePath="external-library">
+      <compilation-unit name="typed_adapter">
+        <output-dir compare="Text">typed_adapter</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="classad-parser-new">
+        <output-dir compare="Text">classad-parser-new</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="classad-parser-old">
+        <output-dir compare="Text">classad-parser-old</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="getCapital">
+        <output-dir compare="Text">getCapital</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
   <test-group name="external">
     <test-case FilePath="external">
       <compilation-unit name="invalid-format">
@@ -68,28 +90,6 @@
       </compilation-unit>
     </test-case>
   </test-group>
-  <test-group name="external-library">
-    <test-case FilePath="external-library">
-      <compilation-unit name="typed_adapter">
-        <output-dir compare="Text">typed_adapter</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="external-library">
-      <compilation-unit name="classad-parser-new">
-        <output-dir compare="Text">classad-parser-new</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="external-library">
-      <compilation-unit name="classad-parser-old">
-        <output-dir compare="Text">classad-parser-old</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="external-library">
-      <compilation-unit name="getCapital">
-        <output-dir compare="Text">getCapital</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
   <test-group name="feeds">
     <test-case FilePath="feeds">
       <compilation-unit name="feed-with-multiple-indexes">

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/IExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/IExceptionHandler.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/IExceptionHandler.java
new file mode 100644
index 0000000..e0c7f25
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/IExceptionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Handles an exception encountered during processing of a data frame.
+ * In the case when the exception is of type {@code FrameDataException}, the causing
+ * tuple is logged and a new frame with tuple after the exception-generating tuple
+ * is returned. This functionality is used during feed ingestion to bypass an exception
+ * generating tuple and thus avoid the data flow from terminating
+ */
+public interface IExceptionHandler {
+
+    /**
+     * @param e
+     *            the exception that needs to be handled
+     * @param frame
+     *            the frame that was being processed when exception occurred
+     * @return returns a new frame with tuples after the exception generating tuple
+     * @throws HyracksDataException
+     */
+    public ByteBuffer handle(HyracksDataException e, ByteBuffer frame);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 7b25f5a..5217846 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -137,7 +137,7 @@
     </plugins>
     <pluginManagement>
       <plugins>
-        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+                <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
         <plugin>
           <groupId>org.eclipse.m2e</groupId>
           <artifactId>lifecycle-mapping</artifactId>
@@ -147,8 +147,8 @@
               <pluginExecutions>
                 <pluginExecution>
                   <pluginExecutionFilter>
-                    <groupId>org.apache.asterix</groupId>
-                    <artifactId>lexer-generator-maven-plugin</artifactId>
+                    <groupId> org.apache.asterix</groupId>
+                    <artifactId> lexer-generator-maven-plugin</artifactId>
                     <versionRange>[0.1,)</versionRange>
                     <goals>
                       <goal>generate-lexer</goal>
@@ -162,7 +162,7 @@
                 </pluginExecution>
                 <pluginExecution>
                   <pluginExecutionFilter>
-                    <groupId>org.codehaus.mojo</groupId>
+                    <groupId> org.codehaus.mojo</groupId>
                     <artifactId>build-helper-maven-plugin</artifactId>
                     <versionRange>[1.7,)</versionRange>
                     <goals>
@@ -170,7 +170,7 @@
                     </goals>
                   </pluginExecutionFilter>
                   <action>
-                    <ignore/>
+                    <ignore />
                   </action>
                 </pluginExecution>
               </pluginExecutions>
@@ -194,6 +194,12 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-test-support</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-runtime</artifactId>
       <version>0.8.9-SNAPSHOT</version>
@@ -289,5 +295,28 @@
       <artifactId>core-io</artifactId>
       <version>1.2.7</version>
     </dependency>
+    <dependency>
+      <groupId>io.reactivex</groupId>
+      <artifactId>rxjava</artifactId>
+      <version>1.0.15</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>2.0.2-beta</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <version>1.6.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.6.2</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
deleted file mode 100644
index e5b22e9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
-import org.apache.asterix.external.feed.management.FeedId;
-
-public interface IAdapterRuntimeManager {
-
-    public enum State {
-        /**
-         * Indicates that AsterixDB is maintaining the flow of data from external source into its storage.
-         */
-        ACTIVE_INGESTION,
-
-        /**
-         * Indicates that data from external source is being buffered and not
-         * pushed downstream
-         */
-
-        INACTIVE_INGESTION,
-        /**
-         * Indicates that feed ingestion activity has finished.
-         */
-        FINISHED_INGESTION,
-
-        /** Indicates the occurrence of a failure during the intake stage of a data ingestion pipeline **/
-        FAILED_INGESTION
-    }
-
-    /**
-     * Start feed ingestion
-     * @throws Exception
-     */
-    public void start() throws Exception;
-
-    /**
-     * Stop feed ingestion.
-     * @throws Exception
-     */
-    public void stop() throws Exception;
-
-    /**
-     * @return feedId associated with the feed that is being ingested.
-     */
-    public FeedId getFeedId();
-
-    /**
-     * @return an instance of the {@code FeedAdapter} in use.
-     */
-    public FeedAdapter getFeedAdapter();
-
-    /**
-     * @return state associated with the AdapterRuntimeManager. See {@code State}.
-     */
-    public State getState();
-
-    /**
-     * @param state
-     */
-    public void setState(State state);
-
-    public IIntakeProgressTracker getProgressTracker();
-
-    public int getPartition();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index ac30172..b09bef9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -92,13 +92,13 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         try {
             tupleForwarder.close();
         } catch (Throwable th) {
-            hde = ExternalDataExceptionUtils.suppress(hde, th);
+            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
         }
         try {
             recordReader.close();
         } catch (Throwable th) {
             LOGGER.warn("Failure during while operating a feed sourcec", th);
-            hde = ExternalDataExceptionUtils.suppress(hde, th);
+            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
         } finally {
             closeSignal();
             if (hde != null) {
@@ -137,12 +137,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
                 try {
                     tupleForwarder.close();
                 } catch (Throwable th) {
-                    hde = ExternalDataExceptionUtils.suppress(hde, th);
+                    hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
                 }
                 try {
                     recordReader.close();
                 } catch (Throwable th) {
-                    hde = ExternalDataExceptionUtils.suppress(hde, th);
+                    hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
                 }
                 if (hde != null) {
                     throw hde;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 60aaea2..7ae2f41 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -95,13 +95,30 @@ public class FeedTupleForwarder implements ITupleForwarder {
 
     @Override
     public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
+        Throwable throwable = null;
         try {
-            feedLogManager.close();
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame.getBuffer(), writer);
+            }
+        } catch (Throwable th) {
+            throwable = th;
+            throw th;
+        } finally {
+            try {
+                feedLogManager.close();
+            } catch (IOException e) {
+                if (throwable != null) {
+                    throwable.addSuppressed(e);
+                } else {
+                    throw new HyracksDataException(e);
+                }
+            } catch (Throwable th) {
+                if (throwable != null) {
+                    throwable.addSuppressed(th);
+                } else {
+                    throw th;
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java
deleted file mode 100644
index 4f0ed77..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ICentralFeedManager.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface ICentralFeedManager {
-
-    public void start() throws AsterixException;
-
-    public void stop() throws AsterixException, IOException;
-
-    public IFeedTrackingManager getFeedTrackingManager();
-
-    public IFeedLoadManager getFeedLoadManager();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java
deleted file mode 100644
index ec0af1c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IExceptionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Handles an exception encountered during processing of a data frame.
- * In the case when the exception is of type {@code FrameDataException}, the causing
- * tuple is logged and a new frame with tuple after the exception-generating tuple
- * is returned. This funcitonality is used during feed ingestion to bypass an exception
- * generating tuple and thus avoid the data flow from terminating
- */
-public interface IExceptionHandler {
-
-    /**
-     * @param e
-     *            the exception that needs to be handled
-     * @param frame
-     *            the frame that was being processed when exception occurred
-     * @return returns a new frame with tuples after the exception generating tuple
-     * @throws HyracksDataException
-     */
-    public ByteBuffer handleException(Exception e, ByteBuffer frame);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
index 32d551a..503715b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
@@ -37,7 +37,7 @@ public interface IFeedConnectionManager {
      * @param feedRuntime
      * @throws Exception
      */
-    public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) throws Exception;
+    public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime);
 
     /**
      * Obtain feed runtime corresponding to a feedRuntimeId

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java
deleted file mode 100644
index 3b37500..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedFrameHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.asterix.external.feed.dataflow.DataBucket;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFeedFrameHandler {
-
-    public void handleFrame(ByteBuffer frame) throws HyracksDataException, InterruptedException;
-
-    public void handleDataBucket(DataBucket bucket) throws InterruptedException;
-
-    public void close();
-
-    public Iterator<ByteBuffer> replayData() throws HyracksDataException;
-
-    public String getSummary();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java
deleted file mode 100644
index b9caa0d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleIntakeEventSubscriber.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-
-public interface IFeedLifecycleIntakeEventSubscriber extends IFeedLifecycleEventSubscriber {
-
-    public void handleFeedEvent(FeedIntakeInfo iInfo, FeedLifecycleEvent event) throws AsterixException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
index 448ea47..3302856 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
@@ -20,13 +20,12 @@ package org.apache.asterix.external.feed.api;
 
 import java.util.List;
 
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.management.FeedJointKey;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 
-public interface IFeedLifecycleListener extends IJobLifecycleListener, IClusterEventsSubscriber {
+public interface IFeedLifecycleListener extends IJobLifecycleListener {
     public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJoinKey);
 
     public boolean isFeedJointAvailable(FeedJointKey feedJoinKey);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java
deleted file mode 100644
index 1b6347a..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLoadManager.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.json.JSONException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-
-public interface IFeedLoadManager {
-
-    public void submitNodeLoadReport(NodeLoadReport report);
-
-    public void reportCongestion(FeedCongestionMessage message) throws JSONException, AsterixException;
-
-    public void submitFeedRuntimeReport(FeedReportMessage message);
-
-    public void submitScaleInPossibleReport(ScaleInReportMessage sm) throws AsterixException, Exception;
-
-    public List<String> getNodes(int required);
-
-    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception;
-
-    int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType);
-
-    void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity);
-
-    void removeFeedActivity(FeedConnectionId connectionId);
-
-    public FeedActivity getFeedActivity(FeedConnectionId connectionId);
-
-    public Collection<FeedActivity> getFeedActivities();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java
deleted file mode 100644
index b3ad0a5..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-
-/**
- * Provides access to services related to feed management within a node controller
- */
-public interface IFeedManager {
-
-    /**
-     * gets the handle to the singleton instance of subscription manager
-     * @return the singleton instance of subscription manager
-     * @see IFeedSubscriptionManager
-     */
-    public IFeedSubscriptionManager getFeedSubscriptionManager();
-
-    /**
-     * gets the handle to the singleton instance of connection manager
-     * @return the singleton instance of connection manager
-     * @see IFeedConnectionManager
-     */
-    public IFeedConnectionManager getFeedConnectionManager();
-
-    /**
-     * gets the handle to the singleton instance of memory manager
-     * @return the singleton instance of memory manager
-     * @see IFeedMemoryManager
-     */
-    public IFeedMemoryManager getFeedMemoryManager();
-
-    /**
-     * gets the handle to the singleton instance of feed metadata manager
-     * @return the singleton instance of feed metadata manager
-     * @see IFeedMetadataManager
-     */
-    public IFeedMetadataManager getFeedMetadataManager();
-
-    /**
-     * gets the handle to the singleton instance of feed metric collector
-     * @return the singleton instance of feed metric collector
-     * @see IFeedMetricCollector
-     */
-    public IFeedMetricCollector getFeedMetricCollector();
-
-    /**
-     * gets the handle to the singleton instance of feed message service
-     * @return the singleton instance of feed message service
-     * @see IFeedMessageService
-     */
-    public IFeedMessageService getFeedMessageService();
-
-    public AsterixFeedProperties getAsterixFeedProperties();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java
deleted file mode 100644
index 8e25b69..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryComponent.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-/**
- * Represents an in-memory components required for storing frames that contain feed tuples.
- * The component's memory footprint is measured and regulated by the {@link IFeedMemoryManager}.
- * Any expansion in size is accounted and can be restricted by the {@link IFeedMemoryManager}
- **/
-public interface IFeedMemoryComponent {
-
-    public enum Type {
-
-        /** A pool of reusable frames **/
-        POOL,
-
-        /** An ordered list of frames **/
-        COLLECTION
-    }
-
-    /** Gets the unique id associated with the memory component **/
-    public int getComponentId();
-
-    /** Gets the type associated with the component. **/
-    public Type getType();
-
-    /** Gets the current size (number of allocated frames) of the component. **/
-    public int getTotalAllocation();
-
-    /**
-     * Expands this memory component by the speficied number of frames
-     *
-     * @param delta
-     *            the amount (measured in number of frames) by which this memory component
-     *            should be expanded
-     */
-    public void expand(int delta);
-
-    /** Clears the allocated frames as a step to reclaim the memory **/
-    public void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java
deleted file mode 100644
index 508602c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMemoryManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
-
-/**
- * Provides management of memory allocated for handling feed data flow through the node controller
- */
-public interface IFeedMemoryManager {
-
-    public static final int START_COLLECTION_SIZE = 20;
-    public static final int START_POOL_SIZE = 10;
-
-    /**
-     * Gets a memory component allocated from the feed memory budget
-     *
-     * @param type
-     *            the kind of memory component that needs to be allocated
-     * @return
-     * @see Type
-     */
-    public IFeedMemoryComponent getMemoryComponent(Type type);
-
-    /**
-     * Expand a memory component by the default increment
-     *
-     * @param memoryComponent
-     * @return true if the expansion succeeded
-     *         false if the requested expansion violates the configured budget
-     */
-    public boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent);
-
-    /**
-     * Releases the given memory component to reclaim the memory allocated for the component
-     *
-     * @param memoryComponent
-     *            the memory component that is being reclaimed/released
-     */
-    public void releaseMemoryComponent(IFeedMemoryComponent memoryComponent);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
index bc2c938..15e2de6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
@@ -28,18 +28,7 @@ import org.apache.hyracks.api.dataflow.value.JSONSerializable;
 public interface IFeedMessage extends Serializable, JSONSerializable {
 
     public enum MessageType {
-        END,
-        XAQL,
-        FEED_REPORT,
-        NODE_REPORT,
-        STORAGE_REPORT,
-        CONGESTION,
-        PREPARE_STALL,
-        TERMINATE_FLOW,
-        SCALE_IN_REQUEST,
-        COMMIT_ACK,
-        COMMIT_ACK_RESPONSE,
-        THROTTLING_ENABLED
+        END
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java
deleted file mode 100644
index a98cf2b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessageService.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-/**
- * Provides the functionality of sending a meesage ({@code IFeedMessage} to the {@code CentralFeedManager}
- */
-public interface IFeedMessageService extends IFeedService {
-
-    /**
-     * Sends a message ({@code IFeedMessage} to the {@code CentralFeedManager} running at the CC
-     * The message is sent asynchronously.
-     *
-     * @param message
-     *            the message to be sent
-     */
-    public void sendMessage(IFeedMessage message);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java
deleted file mode 100644
index 3712678..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetadataManager.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-
-public interface IFeedMetadataManager {
-
-    /**
-     * @param feedConnectionId
-     *            connection id corresponding to the feed connection
-     * @param tuple
-     *            the erroneous tuple that raised an exception
-     * @param message
-     *            the message corresponding to the exception being raised
-     * @param feedManager
-     * @throws AsterixException
-     */
-    public void logTuple(FeedConnectionId feedConnectionId, String tuple, String message, IFeedManager feedManager)
-            throws AsterixException;
-
-}



Mime
View raw message