asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [09/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:51 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
index 821822f..486b4ab 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -56,8 +57,8 @@ public class AsterixCLI {
         try {
             for (String queryFile : options.args) {
                 Reader in = new FileReader(queryFile);
-                AsterixJavaClient ajc = new AsterixJavaClient(
-                        integrationUtil.getHyracksClientConnection(), in, compilationProvider);
+                AsterixJavaClient ajc = new AsterixJavaClient(integrationUtil.getHyracksClientConnection(), in,
+                        compilationProvider, new DefaultStatementExecutorFactory(null));
                 try {
                     ajc.compile(true, false, false, false, false, true, false);
                 } finally {
@@ -77,8 +78,9 @@ public class AsterixCLI {
         outdir.mkdirs();
 
         File log = new File("asterix_logs");
-        if (log.exists())
+        if (log.exists()) {
             FileUtils.deleteDirectory(log);
+        }
         File lsn = new File("last_checkpoint_lsn");
         lsn.deleteOnExit();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
index 2456bc2..e0a60c6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
@@ -22,6 +22,7 @@ import java.io.FileReader;
 
 import org.apache.asterix.api.common.AsterixClientConfig;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -62,7 +63,8 @@ public class AsterixClientDriver {
             boolean onlyPhysical, boolean createBinaryRuntime) throws Exception {
         ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
         FileReader reader = new FileReader(filename);
-        AsterixJavaClient q = new AsterixJavaClient(hcc, reader, compilationProvider);
+        AsterixJavaClient q = new AsterixJavaClient(hcc, reader, compilationProvider,
+                new DefaultStatementExecutorFactory(null));
         q.compile(optimize, true, true, true, onlyPhysical, createBinaryRuntime, createBinaryRuntime);
         return q;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
index f4c8062..8bcceca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
@@ -19,18 +19,22 @@
 package org.apache.asterix.drivers;
 
 import org.apache.asterix.api.http.servlet.APIServlet;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
 public class AsterixWebServer {
-    public static void main(String args[]) throws Exception {
+    public static void main(String[] args) throws Exception {
         Server server = new Server(8080);
         ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
         context.setContextPath("/");
         server.setHandler(context);
 
-        context.addServlet(new ServletHolder(new APIServlet()), "/*");
+        context.addServlet(new ServletHolder(new APIServlet(new AqlCompilationProvider(),
+                new SqlppCompilationProvider(), new DefaultStatementExecutorFactory(null))), "/*");
         server.start();
         server.join();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 3ab6f3d..cf57174 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
+import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
@@ -39,6 +42,7 @@ import org.apache.asterix.api.http.servlet.QueryStatusAPIServlet;
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
 import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
+import org.apache.asterix.app.cc.CompilerExtensionManager;
 import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -47,8 +51,6 @@ import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.utils.ServletUtil.Servlets;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.event.service.ILookupService;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.messaging.CCMessageBroker;
@@ -69,9 +71,6 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
 public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
@@ -80,6 +79,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     private static IAsterixStateProxy proxy;
     protected ICCApplicationContext appCtx;
+    protected CompilerExtensionManager ccExtensionManager;
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
@@ -91,22 +91,25 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         }
 
         appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
-        GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager((HyracksConnection) getNewHyracksClientConnection());
+        GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection());
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
-        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.INSTANCE,
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
                 libraryManager);
+        ccExtensionManager = new CompilerExtensionManager(
+                AsterixAppContextInfo.getInstance().getExtensionProperties().getExtensions());
+        AsterixAppContextInfo.getInstance().setExtensionManager(ccExtensionManager);
 
         if (System.getProperty("java.rmi.server.hostname") == null) {
             System.setProperty("java.rmi.server.hostname",
                     ((ClusterControllerService) ccAppCtx.getControllerService()).getCCConfig().clusterNetIpAddress);
         }
 
-        proxy = AsterixStateProxy.registerRemoteObject();
+        setAsterixStateProxy(AsterixStateProxy.registerRemoteObject());
         appCtx.setDistributedState(proxy);
 
         AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
-        MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
+        MetadataManager.instantiate(new MetadataManager(proxy, metadataProperties));
 
         AsterixAppContextInfo.getInstance().getCCApplicationContext()
                 .addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
@@ -117,7 +120,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
             server.start();
         }
 
-        ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
+        ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.instance());
 
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
         ccAppCtx.setMessageBroker(messageBroker);
@@ -168,7 +171,9 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
 
         webServer.setHandler(context);
-        context.addServlet(new ServletHolder(new APIServlet()), "/*");
+        context.addServlet(new ServletHolder(new APIServlet(ccExtensionManager.getAqlCompilationProvider(),
+                ccExtensionManager.getSqlppCompilationProvider(), ccExtensionManager.getQueryTranslatorFactory())),
+                "/*");
 
         return webServer;
     }
@@ -235,27 +240,36 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
     protected Servlet createServlet(Servlets key) {
         switch (key) {
             case AQL:
-                return new AQLAPIServlet(new AqlCompilationProvider());
+                return new AQLAPIServlet(ccExtensionManager.getAqlCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case AQL_QUERY:
-                return new QueryAPIServlet(new AqlCompilationProvider());
+                return new QueryAPIServlet(ccExtensionManager.getAqlCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case AQL_UPDATE:
-                return new UpdateAPIServlet(new AqlCompilationProvider());
+                return new UpdateAPIServlet(ccExtensionManager.getAqlCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case AQL_DDL:
-                return new DDLAPIServlet(new AqlCompilationProvider());
+                return new DDLAPIServlet(ccExtensionManager.getAqlCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case SQLPP:
-                return new AQLAPIServlet(new SqlppCompilationProvider());
+                return new AQLAPIServlet(ccExtensionManager.getSqlppCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case SQLPP_QUERY:
-                return new QueryAPIServlet(new SqlppCompilationProvider());
+                return new QueryAPIServlet(ccExtensionManager.getSqlppCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case SQLPP_UPDATE:
-                return new UpdateAPIServlet(new SqlppCompilationProvider());
+                return new UpdateAPIServlet(ccExtensionManager.getSqlppCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case SQLPP_DDL:
-                return new DDLAPIServlet(new SqlppCompilationProvider());
+                return new DDLAPIServlet(ccExtensionManager.getSqlppCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case QUERY_STATUS:
                 return new QueryStatusAPIServlet();
             case QUERY_RESULT:
                 return new QueryResultAPIServlet();
             case QUERY_SERVICE:
-                return new QueryServiceServlet(new SqlppCompilationProvider());
+                return new QueryServiceServlet(ccExtensionManager.getSqlppCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory());
             case CONNECTOR:
                 return new ConnectorAPIServlet();
             case SHUTDOWN:
@@ -294,4 +308,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
                     ClusterState.ACTIVE);
         }
     }
-}
\ No newline at end of file
+
+    public static synchronized void setAsterixStateProxy(IAsterixStateProxy proxy) {
+        CCApplicationEntryPoint.proxy = proxy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index af8aa31..0d5cfb2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -32,7 +32,6 @@ import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -41,6 +40,7 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.job.JobId;
@@ -48,19 +48,19 @@ import org.apache.hyracks.api.job.JobSpecification;
 
 public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
 
-    private static ClusterState state;
     private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName());
+    private static GlobalRecoveryManager instance;
+    private static ClusterState state;
     private HyracksConnection hcc;
-    public static GlobalRecoveryManager INSTANCE;
 
-    public GlobalRecoveryManager(HyracksConnection hcc) throws Exception {
-        state = ClusterState.UNUSABLE;
+    private GlobalRecoveryManager(HyracksConnection hcc) {
+        setState(ClusterState.UNUSABLE);
         this.hcc = hcc;
     }
 
     @Override
     public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        state = AsterixClusterProperties.INSTANCE.getState();
+        setState(AsterixClusterProperties.INSTANCE.getState());
         AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
         return Collections.emptySet();
     }
@@ -115,7 +115,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
                                         // Get indexes
                                         List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
                                                 dataset.getDataverseName(), dataset.getDatasetName());
-                                        if (indexes.size() > 0) {
+                                        if (!indexes.isEmpty()) {
                                             // Get the state of the dataset
                                             ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
                                                     .getDatasetDetails();
@@ -217,8 +217,20 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
                     LOGGER.info("Global Recovery Completed");
                 }
             });
-            state = newState;
+            setState(newState);
             recoveryThread.start();
         }
     }
+
+    public static GlobalRecoveryManager instance() {
+        return instance;
+    }
+
+    public static synchronized void instantiate(HyracksConnection hcc) {
+        instance = new GlobalRecoveryManager(hcc);
+    }
+
+    public static synchronized void setState(ClusterState state) {
+        GlobalRecoveryManager.state = state;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4cb9d70..5abe6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.common.AsterixAppRuntimeContext;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.nc.AsterixNCAppRuntimeContext;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
@@ -100,8 +100,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
                     ((NodeControllerService) ncAppCtx.getControllerService())
                             .getConfiguration().clusterNetPublicIPAddress);
         }
-
-        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext, metadataRmiPort);
+        runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext, metadataRmiPort);
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 138b620..27a5365 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
@@ -81,7 +81,7 @@ public class CCMessageBroker implements ICCMessageBroker {
             case COMPLETE_FAILBACK_RESPONSE:
                 handleCompleteFailbcakResponse(message);
                 break;
-            case ACTIVE_ENTITY_MESSAGE:
+            case ACTIVE_ENTITY_TO_CC_MESSAGE:
                 handleActiveEntityMessage(message);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 13b0189..74a5ba2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
@@ -113,6 +115,9 @@ public class NCMessageBroker implements INCMessageBroker {
                 case REPLICA_EVENT:
                     handleReplicaEvent(message);
                     break;
+                case ACTIVE_MANAGER_MESSAGE:
+                    ((ActiveManager) appContext.getActiveManager()).submit((ActiveManagerMessage) message);
+                    break;
                 default:
                     break;
             }
@@ -143,8 +148,8 @@ public class NCMessageBroker implements INCMessageBroker {
             appContext.initializeMetadata(false);
             appContext.exportMetadataNodeStub();
         } finally {
-            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
-                    appContext.getTransactionSubsystem().getId());
+            TakeoverMetadataNodeResponseMessage reponse =
+                    new TakeoverMetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId());
             sendMessage(reponse, null);
         }
     }
@@ -189,15 +194,15 @@ public class NCMessageBroker implements INCMessageBroker {
         }
 
         //mark the partitions to be closed as inactive
-        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
-                .getLocalResourceRepository();
+        PersistentLocalResourceRepository localResourceRepo =
+                (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
         for (Integer partitionId : msg.getPartitions()) {
             localResourceRepo.addInactivePartition(partitionId);
         }
 
         //send response after partitions prepared for failback
-        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
-                msg.getRequestId(), msg.getPartitions());
+        PreparePartitionsFailbackResponseMessage reponse =
+                new PreparePartitionsFailbackResponseMessage(msg.getPlanId(), msg.getRequestId(), msg.getPartitions());
         sendMessage(reponse, null);
     }
 
@@ -207,8 +212,8 @@ public class NCMessageBroker implements INCMessageBroker {
             IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
             remoteRecoeryManager.completeFailbackProcess();
         } finally {
-            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
-                    msg.getRequestId(), msg.getPartitions());
+            CompleteFailbackResponseMessage reponse =
+                    new CompleteFailbackResponseMessage(msg.getPlanId(), msg.getRequestId(), msg.getPartitions());
             sendMessage(reponse, null);
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultPrinter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultPrinter.java
deleted file mode 100644
index 8e16ef7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultPrinter.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.result;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.common.utils.JSONUtil;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.resources.memory.FrameManager;
-
-public class ResultPrinter {
-
-    // TODO(tillw): Should this be static?
-    private static FrameManager resultDisplayFrameMgr = new FrameManager(ResultReader.FRAME_SIZE);
-
-    private final SessionConfig conf;
-    private final ResultUtils.Stats stats;
-    private final ARecordType recordType;
-
-    private boolean indentJSON;
-    private boolean quoteRecord;
-
-    // Whether we are wrapping the output sequence in an array
-    private boolean wrapArray = false;
-    // Whether this is the first instance being output
-    private boolean notFirst = false;
-
-    public ResultPrinter(SessionConfig conf, ResultUtils.Stats stats, ARecordType recordType) {
-        this.conf = conf;
-        this.stats = stats;
-        this.recordType = recordType;
-        this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
-        this.quoteRecord = conf.is(SessionConfig.FORMAT_QUOTE_RECORD);
-    }
-
-    private static void appendCSVHeader(Appendable app, ARecordType recordType) throws HyracksDataException {
-        try {
-            String[] fieldNames = recordType.getFieldNames();
-            boolean notfirst = false;
-            for (String name : fieldNames) {
-                if (notfirst) {
-                    app.append(',');
-                }
-                notfirst = true;
-                app.append('"').append(name.replace("\"", "\"\"")).append('"');
-            }
-            app.append("\r\n");
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void printPrefix() throws HyracksDataException {
-        // If we're outputting CSV with a header, the HTML header was already
-        // output by displayCSVHeader(), so skip it here
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>Results:</h4>");
-            conf.out().println("<pre>");
-        }
-
-        try {
-            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
-        } catch (AlgebricksException e) {
-            throw new HyracksDataException(e);
-        }
-
-        if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
-            conf.out().print("[ ");
-            wrapArray = true;
-        }
-
-        if (conf.fmt() == SessionConfig.OutputFormat.CSV && conf.is(SessionConfig.FORMAT_CSV_HEADER)) {
-            if (recordType == null) {
-                throw new HyracksDataException("Cannot print CSV with header without specifying output-record-type");
-            }
-            if (quoteRecord) {
-                StringWriter sw = new StringWriter();
-                appendCSVHeader(sw, recordType);
-                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
-                conf.out().print("\n");
-                notFirst = true;
-            } else {
-                appendCSVHeader(conf.out(), recordType);
-            }
-        }
-    }
-
-    private void printPostfix() throws HyracksDataException {
-        conf.out().flush();
-        if (wrapArray) {
-            conf.out().println(" ]");
-        }
-        try {
-            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
-        } catch (AlgebricksException e) {
-            throw new HyracksDataException(e);
-        }
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
-        }
-    }
-
-    private void displayRecord(String result) {
-        String record = result;
-        if (indentJSON) {
-            // TODO(tillw): this is inefficient - do this during record generation
-            record = JSONUtil.indent(record, 2);
-        }
-        if (conf.fmt() == SessionConfig.OutputFormat.CSV) {
-            // TODO(tillw): this is inefficient as well
-            record = record + "\r\n";
-        }
-        if (quoteRecord) {
-            // TODO(tillw): this is inefficient as well
-            record = JSONUtil.quoteAndEscape(record);
-        }
-        conf.out().print(record);
-        ++stats.count;
-        // TODO(tillw) fix this approximation
-        stats.size += record.length();
-    }
-
-    public void print(String record) throws HyracksDataException {
-        printPrefix();
-        // TODO(tillw) evil hack
-        quoteRecord = true;
-        displayRecord(record);
-        printPostfix();
-    }
-
-    public void print(ResultReader resultReader) throws HyracksDataException {
-        printPrefix();
-
-        final IFrameTupleAccessor fta = resultReader.getFrameTupleAccessor();
-        final IFrame frame = new VSizeFrame(resultDisplayFrameMgr);
-
-        while (resultReader.read(frame) > 0) {
-            final ByteBuffer frameBuffer = frame.getBuffer();
-            final byte[] frameBytes = frameBuffer.array();
-            fta.reset(frameBuffer);
-            final int last = fta.getTupleCount();
-            for (int tIndex = 0; tIndex < last; tIndex++) {
-                final int start = fta.getTupleStartOffset(tIndex);
-                int length = fta.getTupleEndOffset(tIndex) - start;
-                if (conf.fmt() == SessionConfig.OutputFormat.CSV
-                        && ((length > 0) && (frameBytes[start + length - 1] == '\n'))) {
-                    length--;
-                }
-                String result = new String(frameBytes, start, length, UTF_8);
-                if (wrapArray && notFirst) {
-                    conf.out().print(", ");
-                }
-                notFirst = true;
-                displayRecord(result);
-            }
-            frameBuffer.clear();
-        }
-
-        printPostfix();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultReader.java
deleted file mode 100644
index 373c0d0..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultReader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.result;
-
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
-
-public class ResultReader {
-    private final IHyracksDataset hyracksDataset;
-
-    private IHyracksDatasetReader reader;
-
-    private IFrameTupleAccessor frameTupleAccessor;
-
-    // Number of parallel result reader buffers
-    public static final int NUM_READERS = 1;
-
-    public static final int FRAME_SIZE = AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize();
-
-    public ResultReader(IHyracksClientConnection hcc, IHyracksDataset hdc) throws Exception {
-        hyracksDataset = hdc;
-    }
-
-    public void open(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
-        reader = hyracksDataset.createReader(jobId, resultSetId);
-        frameTupleAccessor = new ResultFrameTupleAccessor();
-    }
-
-    public Status getStatus() {
-        return reader.getResultStatus();
-    }
-
-    public int read(IFrame frame) throws HyracksDataException {
-        return reader.read(frame);
-    }
-
-    public IFrameTupleAccessor getFrameTupleAccessor() {
-        return frameTupleAccessor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
deleted file mode 100644
index 3503549..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.result;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.http.servlet.APIServlet;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.http.ParseException;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class ResultUtils {
-    static Map<Character, String> HTML_ENTITIES = new HashMap<Character, String>();
-
-    static {
-        HTML_ENTITIES.put('"', "&quot;");
-        HTML_ENTITIES.put('&', "&amp;");
-        HTML_ENTITIES.put('<', "&lt;");
-        HTML_ENTITIES.put('>', "&gt;");
-    }
-
-    public static class Stats {
-        public long count;
-        public long size;
-    }
-
-    public static String escapeHTML(String s) {
-        for (Character c : HTML_ENTITIES.keySet()) {
-            if (s.indexOf(c) >= 0) {
-                s = s.replace(c.toString(), HTML_ENTITIES.get(c));
-            }
-        }
-        return s;
-    }
-
-    public static void displayResults(ResultReader resultReader, SessionConfig conf, Stats stats,
-            ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(conf, stats, recordType).print(resultReader);
-    }
-
-    public static void displayResults(String record, SessionConfig conf, Stats stats, ARecordType recordType)
-            throws HyracksDataException {
-        new ResultPrinter(conf, stats, recordType).print(record);
-    }
-
-    public static JSONObject getErrorResponse(int errorCode, String errorMessage, String errorSummary,
-            String errorStackTrace) {
-        JSONObject errorResp = new JSONObject();
-        JSONArray errorArray = new JSONArray();
-        errorArray.put(errorCode);
-        errorArray.put(errorMessage);
-        try {
-            errorResp.put("error-code", errorArray);
-            if (!"".equals(errorSummary)) {
-                errorResp.put("summary", errorSummary);
-            } else {
-                //parse exception
-                errorResp.put("summary", errorMessage);
-            }
-            errorResp.put("stacktrace", errorStackTrace);
-        } catch (JSONException e) {
-            // TODO(madhusudancs): Figure out what to do when JSONException occurs while building the results.
-        }
-        return errorResp;
-    }
-
-    public static void webUIErrorHandler(PrintWriter out, Exception e) {
-        String errorTemplate = readTemplateFile("/webui/errortemplate.html", "%s\n%s\n%s");
-
-        String errorOutput = String.format(errorTemplate, escapeHTML(extractErrorMessage(e)),
-                escapeHTML(extractErrorSummary(e)), escapeHTML(extractFullStackTrace(e)));
-        out.println(errorOutput);
-    }
-
-    public static void webUIParseExceptionHandler(PrintWriter out, Throwable e, String query) {
-        String errorTemplate = readTemplateFile("/webui/errortemplate_message.html", "<pre class=\"error\">%s\n</pre>");
-
-        String errorOutput = String.format(errorTemplate, buildParseExceptionMessage(e, query));
-        out.println(errorOutput);
-    }
-
-    public static void apiErrorHandler(PrintWriter out, Exception e) {
-        int errorCode = 99;
-        if (e instanceof ParseException) {
-            errorCode = 2;
-        } else if (e instanceof AlgebricksException) {
-            errorCode = 3;
-        } else if (e instanceof HyracksDataException) {
-            errorCode = 4;
-        }
-
-        JSONObject errorResp = ResultUtils.getErrorResponse(errorCode, extractErrorMessage(e), extractErrorSummary(e),
-                extractFullStackTrace(e));
-        out.write(errorResp.toString());
-    }
-
-    public static String buildParseExceptionMessage(Throwable e, String query) {
-        StringBuilder errorMessage = new StringBuilder();
-        String message = e.getMessage();
-        message = message.replace("<", "&lt");
-        message = message.replace(">", "&gt");
-        errorMessage.append("Error: " + message + "\n");
-        int pos = message.indexOf("line");
-        if (pos > 0) {
-            Pattern p = Pattern.compile("\\d+");
-            Matcher m = p.matcher(message);
-            if (m.find(pos)) {
-                int lineNo = Integer.parseInt(message.substring(m.start(), m.end()));
-                String[] lines = query.split("\n");
-                if (lineNo > lines.length) {
-                    errorMessage.append("===> &ltBLANK LINE&gt \n");
-                } else {
-                    String line = lines[lineNo - 1];
-                    errorMessage.append("==> " + line);
-                }
-            }
-        }
-        return errorMessage.toString();
-    }
-
-    private static Throwable getRootCause(Throwable cause) {
-        Throwable nextCause = cause.getCause();
-        while (nextCause != null) {
-            cause = nextCause;
-            nextCause = cause.getCause();
-        }
-        return cause;
-    }
-
-    /**
-     * Extract the message in the root cause of the stack trace:
-     *
-     * @param e
-     * @return error message string.
-     */
-    private static String extractErrorMessage(Throwable e) {
-        Throwable cause = getRootCause(e);
-        String fullyQualifiedExceptionClassName = cause.getClass().getName();
-        String[] hierarchySplits = fullyQualifiedExceptionClassName.split("\\.");
-        //try returning the class without package qualification
-        String exceptionClassName = hierarchySplits[hierarchySplits.length - 1];
-        String localizedMessage = cause.getLocalizedMessage();
-        if (localizedMessage == null) {
-            localizedMessage = "Internal error. Please check instance logs for further details.";
-        }
-        return localizedMessage + " [" + exceptionClassName + "]";
-    }
-
-    /**
-     * Extract the meaningful part of a stack trace:
-     * a. the causes in the stack trace hierarchy
-     * b. the top exception for each cause
-     *
-     * @param e
-     * @return the contacted message containing a and b.
-     */
-    private static String extractErrorSummary(Throwable e) {
-        StringBuilder errorMessageBuilder = new StringBuilder();
-        Throwable cause = e;
-        errorMessageBuilder.append(cause.getLocalizedMessage());
-        while (cause != null) {
-            StackTraceElement[] stackTraceElements = cause.getStackTrace();
-            errorMessageBuilder.append(stackTraceElements.length > 0 ? "\n caused by: " + stackTraceElements[0] : "");
-            cause = cause.getCause();
-        }
-        return errorMessageBuilder.toString();
-    }
-
-    /**
-     * Extract the full stack trace:
-     *
-     * @param e
-     * @return the string containing the full stack trace of the error.
-     */
-    public static String extractFullStackTrace(Throwable e) {
-        StringWriter stringWriter = new StringWriter();
-        PrintWriter printWriter = new PrintWriter(stringWriter);
-        e.printStackTrace(printWriter);
-        return stringWriter.toString();
-    }
-
-    /**
-     * Read the template file which is stored as a resource and return its content. If the file does not exist or is
-     * not readable return the default template string.
-     *
-     * @param path
-     *            The path to the resource template file
-     * @param defaultTemplate
-     *            The default template string if the template file does not exist or is not readable
-     * @return The template string to be used to render the output.
-     */
-    private static String readTemplateFile(String path, String defaultTemplate) {
-        String errorTemplate = defaultTemplate;
-        try {
-            String resourcePath = "/webui/errortemplate_message.html";
-            InputStream is = APIServlet.class.getResourceAsStream(resourcePath);
-            InputStreamReader isr = new InputStreamReader(is);
-            StringBuilder sb = new StringBuilder();
-            BufferedReader br = new BufferedReader(isr);
-            String line = br.readLine();
-
-            while (line != null) {
-                sb.append(line);
-                line = br.readLine();
-            }
-            errorTemplate = sb.toString();
-        } catch (IOException ioe) {
-            // If there is an IOException reading the error template html file, default value of error template is used.
-        }
-        return errorTemplate;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 596dc41..7e35f11 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -23,9 +23,8 @@ import java.util.Map;
 import java.util.logging.Logger;
 
 import org.apache.asterix.algebra.operators.physical.CommitRuntime;
-import org.apache.asterix.api.common.AsterixAppRuntimeContext;
-import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.app.external.TestLibrarian;
+import org.apache.asterix.app.nc.AsterixNCAppRuntimeContext;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
@@ -97,8 +96,8 @@ public class TestNodeController {
     protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
 
     protected static final String PATH_ACTUAL = "unittest" + File.separator;
-    protected static final String PATH_BASE =
-            StringUtils.join(new String[] { "src", "test", "resources", "nodetests" }, File.separator);
+    protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "nodetests" },
+            File.separator);
 
     protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     protected static AsterixTransactionProperties txnProperties;
@@ -158,18 +157,18 @@ public class TestNodeController {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields);
         IndexOperation op = IndexOperation.INSERT;
-        IModificationOperationCallbackFactory modOpCallbackFactory =
-                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
-                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
-        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
-                getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
-        LSMBTreeDataflowHelperFactory dataflowHelperFactory =
-                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        IModificationOperationCallbackFactory modOpCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(
+                getTxnJobId(), dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op,
+                ResourceType.LSM_BTREE, true);
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc = getInsertOpratorDesc(primaryIndexInfo,
+                modOpCallbackFactory);
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory = getPrimaryIndexDataflowHelperFactory(ctx,
+                primaryIndexInfo);
         Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory);
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
-        AsterixLSMInsertDeleteOperatorNodePushable insertOp =
-                new AsterixLSMInsertDeleteOperatorNodePushable(indexOpDesc, ctx, PARTITION,
-                        primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true);
+        AsterixLSMInsertDeleteOperatorNodePushable insertOp = new AsterixLSMInsertDeleteOperatorNodePushable(
+                indexOpDesc, ctx, PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider,
+                op, true);
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
@@ -185,8 +184,8 @@ public class TestNodeController {
         JobSpecification spec = new JobSpecification();
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields);
-        LSMBTreeDataflowHelperFactory indexDataflowHelperFactory =
-                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        LSMBTreeDataflowHelperFactory indexDataflowHelperFactory = getPrimaryIndexDataflowHelperFactory(ctx,
+                primaryIndexInfo);
         BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits,
@@ -214,8 +213,8 @@ public class TestNodeController {
 
     public AsterixLSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo,
             IModificationOperationCallbackFactory modOpCallbackFactory) {
-        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
-                Mockito.mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class);
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc = Mockito
+                .mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class);
         Mockito.when(indexOpDesc.getLifecycleManagerProvider())
                 .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
         Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
@@ -258,12 +257,12 @@ public class TestNodeController {
             int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory,
             Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields) {
-        ILocalResourceMetadata localResourceMetadata =
-                new LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits, primaryIndexComparatorFactories,
-                        primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(), mergePolicyFactory,
-                        mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
-        ILocalResourceFactoryProvider localResourceFactoryProvider =
-                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+        ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits,
+                primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(),
+                mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
+                filterFields);
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMBTreeResource);
         return localResourceFactoryProvider;
     }
 
@@ -277,8 +276,8 @@ public class TestNodeController {
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                 BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
                 primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
-        IndexDataflowHelper dataflowHelper =
-                dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx, PARTITION);
+        IndexDataflowHelper dataflowHelper = dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx,
+                PARTITION);
         return (LSMBTreeDataflowHelper) dataflowHelper;
     }
 
@@ -309,8 +308,8 @@ public class TestNodeController {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields);
         TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
-        LSMBTreeDataflowHelper dataflowHelper =
-                getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+        LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo,
+                indexOpDesc);
         dataflowHelper.create();
     }
 
@@ -323,11 +322,10 @@ public class TestNodeController {
     }
 
     private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
-        IBinaryComparatorFactory[] primaryIndexComparatorFactories =
-                new IBinaryComparatorFactory[primaryKeyTypes.length];
+        IBinaryComparatorFactory[] primaryIndexComparatorFactories = new IBinaryComparatorFactory[primaryKeyTypes.length];
         for (int j = 0; j < primaryKeyTypes.length; ++j) {
-            primaryIndexComparatorFactories[j] =
-                    AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
+            primaryIndexComparatorFactories[j] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(primaryKeyTypes[j], true);
         }
         return primaryIndexComparatorFactories;
     }
@@ -337,8 +335,8 @@ public class TestNodeController {
         int i = 0;
         ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
         for (; i < primaryKeyTypes.length; i++) {
-            primaryIndexSerdes[i] =
-                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+            primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(primaryKeyTypes[i]);
         }
         primaryIndexSerdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
         if (metaType != null) {
@@ -372,7 +370,7 @@ public class TestNodeController {
     }
 
     public TransactionSubsystem getTransactionSubsystem() {
-        return (TransactionSubsystem) ((AsterixAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0]
+        return (TransactionSubsystem) ((AsterixNCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0]
                 .getApplicationContext().getApplicationObject()).getTransactionSubsystem();
     }
 
@@ -380,8 +378,8 @@ public class TestNodeController {
         return getTransactionSubsystem().getTransactionManager();
     }
 
-    public AsterixAppRuntimeContext getAppRuntimeContext() {
-        return (AsterixAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext()
+    public AsterixNCAppRuntimeContext getAppRuntimeContext() {
+        return (AsterixNCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext()
                 .getApplicationObject();
     }
 
@@ -423,8 +421,8 @@ public class TestNodeController {
             this.mergePolicyProperties = mergePolicyProperties;
             this.filterFields = filterFields;
             primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
-            primaryIndexTypeTraits =
-                    createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            primaryIndexTypeTraits = createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes,
+                    recordType, metaType);
             primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
             primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
             filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
@@ -435,8 +433,8 @@ public class TestNodeController {
                     primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, mergePolicyFactory,
                     mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
             fileSplitProvider = getFileSplitProvider(dataset);
-            primaryIndexSerdes =
-                    createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType,
+                    metaType);
             rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
             primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields];
             for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
@@ -459,8 +457,8 @@ public class TestNodeController {
             ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length];
             for (int i = 0; i < primaryKeyTypes.length; i++) {
                 primaryKeyTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
-                primaryKeySerdes[i] =
-                        AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+                primaryKeySerdes[i] = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(primaryKeyTypes[i]);
             }
             RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits);
             IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
@@ -472,10 +470,10 @@ public class TestNodeController {
 
     public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
         int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1));
-        ITypeTraits[] primaryIndexTypeTraits =
-                createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
-        ISerializerDeserializer<?>[] primaryIndexSerdes =
-                createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        ITypeTraits[] primaryIndexTypeTraits = createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes,
+                recordType, metaType);
+        ISerializerDeserializer<?>[] primaryIndexSerdes = createPrimaryIndexSerdes(primaryIndexNumOfTupleFields,
+                keyTypes, recordType, metaType);
         return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 532cea9..3591509 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -28,7 +28,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.app.SessionConfig;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.event.schema.cluster.Cluster;
@@ -37,6 +38,7 @@ import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.RunStatement;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.translator.IStatementExecutor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -67,7 +69,7 @@ public class QueryTranslatorTest {
         when(mockCluster.getMasterNode()).thenReturn(mockMasterNode);
         when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
 
-        QueryTranslator aqlTranslator = new QueryTranslator(statements, mockSessionConfig,
+        IStatementExecutor aqlTranslator = new DefaultStatementExecutorFactory(null).create(statements, mockSessionConfig,
                 new AqlCompilationProvider());
         List<String> parameters = new ArrayList<String>();
         parameters.add("examples/pregelix-example-jar-with-dependencies.jar");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
index 84fb2b3..34dab28 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
@@ -27,6 +27,7 @@ import java.io.Reader;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.test.aql.TestExecutor;
@@ -59,9 +60,8 @@ public class DmlTest {
         integrationUtil.init(true);
         Reader loadReader = new BufferedReader(
                 new InputStreamReader(new FileInputStream(LOAD_FOR_ENLIST_FILE), "UTF-8"));
-        AsterixJavaClient asterixLoad = new AsterixJavaClient(
-                integrationUtil.getHyracksClientConnection(), loadReader, ERR,
-                new AqlCompilationProvider());
+        AsterixJavaClient asterixLoad = new AsterixJavaClient(integrationUtil.getHyracksClientConnection(), loadReader,
+                ERR, new AqlCompilationProvider(), new DefaultStatementExecutorFactory(null));
         try {
             asterixLoad.compile(true, false, false, false, false, true, false);
         } catch (AsterixException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index b29c0b0..1f64868 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -30,6 +30,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
@@ -171,7 +172,8 @@ public class OptimizerTest {
             ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider
                     : sqlppCompilationProvider;
             IHyracksClientConnection hcc = integrationUtil.getHyracksClientConnection();
-            AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider);
+            AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider,
+                    new DefaultStatementExecutorFactory(null));
             try {
                 asterix.compile(true, false, false, true, true, false, false);
             } catch (AsterixException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 545faed..3195c39 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -49,11 +49,15 @@ public class ExecutionTestUtil {
     public static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
 
     public static List<ILibraryManager> setUp(boolean cleanup) throws Exception {
+        return setUp(cleanup, TEST_CONFIG_FILE_NAME);
+    }
+
+    public static List<ILibraryManager> setUp(boolean cleanup, String configFile) throws Exception {
         System.out.println("Starting setup");
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting setup");
         }
-        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, configFile);
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("initializing pseudo cluster");
@@ -80,8 +84,8 @@ public class ExecutionTestUtil {
         libraryManagers.add(AsterixAppContextInfo.getInstance().getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
-                    .getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx =
+                    (IAsterixAppRuntimeContext) nc.getApplicationContext().getApplicationObject();
             libraryManagers.add(runtimeCtx.getLibraryManager());
         }
         return libraryManagers;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
index d3dc541..74c7091 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
@@ -25,16 +25,17 @@ import org.apache.asterix.external.dataset.adapter.GenericAdapter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
  * Manages a Mini (local VM) HDFS cluster with a configured number of datanodes.
- * @author ramangrover29
  */
 public class HDFSCluster {
 
@@ -90,10 +91,12 @@ public class HDFSCluster {
         Path destDir = new Path(HDFS_PATH);
         dfs.mkdirs(destDir);
         File srcDir = new File(localDataRoot + DATA_PATH);
-        File[] listOfFiles = srcDir.listFiles();
-        for (File srcFile : listOfFiles) {
-            Path path = new Path(srcFile.getAbsolutePath());
-            dfs.copyFromLocalFile(path, destDir);
+        if (srcDir.exists()) {
+            File[] listOfFiles = srcDir.listFiles();
+            for (File srcFile : listOfFiles) {
+                Path path = new Path(srcFile.getAbsolutePath());
+                dfs.copyFromLocalFile(path, destDir);
+            }
         }
     }
 
@@ -110,6 +113,7 @@ public class HDFSCluster {
             cleanupLocal();
         }
     }
+
     public static void main(String[] args) throws Exception {
         HDFSCluster cluster = new HDFSCluster();
         cluster.setup();
@@ -125,10 +129,10 @@ public class HDFSCluster {
         String hdfsUrl = "hdfs://127.0.0.1:31888";
         String hdfsPath = "/asterix/extrasmalltweets.txt";
         conf.set("fs.default.name", hdfsUrl);
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
         conf.setClassLoader(GenericAdapter.class.getClassLoader());
         conf.set("mapred.input.dir", hdfsPath);
-        conf.set("mapred.input.format.class", "org.apache.hadoop.mapred.TextInputFormat");
+        conf.set("mapred.input.format.class", TextInputFormat.class.getName());
         return conf;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ExtensionId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ExtensionId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ExtensionId.java
new file mode 100644
index 0000000..1cd0ce7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ExtensionId.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.ObjectUtils;
+
+public class ExtensionId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final String name;
+    private final int version;
+
+    public ExtensionId(String name, int version) {
+        this.name = name;
+        this.version = version;
+    }
+
+    @Override
+    public int hashCode() {
+        return ObjectUtils.hashCodeMulti(getName().hashCode(), version);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        } else if (o instanceof ExtensionId) {
+            ExtensionId oExtensionId = (ExtensionId) o;
+            return version == oExtensionId.version && ObjectUtils.equals(name, oExtensionId.getName());
+        }
+        return false;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return name + ":" + version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 100da63..046d5c1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -74,7 +74,7 @@ public interface IAsterixAppRuntimeContext {
 
     public double getBloomFilterFalsePositiveRate();
 
-    public Object getFeedManager();
+    public Object getActiveManager();
 
     public IRemoteRecoveryManager getRemoteRecoveryManager();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java
new file mode 100644
index 0000000..9551935
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+/**
+ * an interface for an extension that provides a mechanism to override system behaviour
+ * or add new features
+ */
+public interface IExtension {
+
+    enum ExtensionKind {
+        /**
+         * Extends Query translator
+         */
+        STATEMENT_EXECUTOR,
+        /**
+         * Extends Metadata
+         */
+        METADATA,
+        /**
+         * Extends Language Syntax and Algebric Operations
+         */
+        LANG
+    }
+
+    /**
+     * @return Unique Id for the extension (used for lookup operations)
+     */
+    ExtensionId getId();
+
+    /**
+     * Configures the extension with configuration parameters.
+     * This method is called on system boot
+     *
+     * @param args
+     */
+    void configure(List<Pair<String, String>> args);
+
+    /**
+     * @return The extension point implemented by this extension
+     */
+    ExtensionKind getExtensionKind();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/app/SessionConfig.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/app/SessionConfig.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/app/SessionConfig.java
new file mode 100644
index 0000000..f4e3be2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/app/SessionConfig.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.app;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+
+/**
+ * SessionConfig captures several different parameters for controlling
+ * the execution of an APIFramework call.
+ * <li>It specifies how the execution will proceed (for instance,
+ * whether to optimize, or whether to execute at all).
+ * <li>It allows you specify where the primary execution output will
+ * be sent.
+ * <li>It also allows you to request additional output for optional
+ * out-of-band data about the execution (query plan, etc).
+ * <li>It allows you to specify the output format for the primary
+ * execution output - LOSSLESS_JSON, CSV, etc.
+ * <li>It allows you to specify output format-specific parameters.
+ */
+
+public class SessionConfig {
+    /**
+     * Used to specify the output format for the primary execution.
+     */
+    public enum OutputFormat {
+        ADM,
+        CSV,
+        CLEAN_JSON,
+        LOSSLESS_JSON
+    };
+
+    /**
+     * Produce out-of-band output for Hyracks Job.
+     */
+    public static final String OOB_HYRACKS_JOB = "oob-hyracks-job";
+
+    /**
+     * Produce out-of-band output for Expression Tree.
+     */
+    public static final String OOB_EXPR_TREE = "oob-expr-tree";
+
+    /**
+     * Produce out-of-band output for Rewritten Expression Tree.
+     */
+    public static final String OOB_REWRITTEN_EXPR_TREE = "oob-rewritten-expr-tree";
+
+    /**
+     * Produce out-of-band output for Logical Plan.
+     */
+    public static final String OOB_LOGICAL_PLAN = "oob-logical-plan";
+
+    /**
+     * Produce out-of-band output for Optimized Logical Plan.
+     */
+    public static final String OOB_OPTIMIZED_LOGICAL_PLAN = "oob-optimized-logical-plan";
+
+    /**
+     * Format flag: print only physical ops (for optimizer tests).
+     */
+    public static final String FORMAT_ONLY_PHYSICAL_OPS = "format-only-physical-ops";
+
+    /**
+     * Format flag: wrap out-of-band data in HTML.
+     */
+    public static final String FORMAT_HTML = "format-html";
+
+    /**
+     * Format flag: print CSV header line.
+     */
+    public static final String FORMAT_CSV_HEADER = "format-csv-header";
+
+    /**
+     * Format flag: wrap results in outer array brackets (JSON or ADM).
+     */
+    public static final String FORMAT_WRAPPER_ARRAY = "format-wrapper-array";
+
+    /**
+     * Format flag: indent JSON results.
+     */
+    public static final String FORMAT_INDENT_JSON = "indent-json";
+
+    /**
+     * Format flag: quote records in the results array.
+     */
+    public static final String FORMAT_QUOTE_RECORD = "quote-record";
+
+    @FunctionalInterface
+    public interface ResultDecorator {
+        AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
+    }
+
+    // Standard execution flags.
+    private final boolean executeQuery;
+    private final boolean generateJobSpec;
+    private final boolean optimize;
+
+    // Output path for primary execution.
+    private final PrintWriter out;
+
+    // Output format.
+    private final OutputFormat fmt;
+
+    private final ResultDecorator preResultDecorator;
+    private final ResultDecorator postResultDecorator;
+
+    // Flags.
+    private final Map<String, Boolean> flags;
+
+    /**
+     * Create a SessionConfig object with all default values:
+     * - All format flags set to "false".
+     * - All out-of-band outputs set to "null".
+     * - "Optimize" set to "true".
+     * - "Execute Query" set to "true".
+     * - "Generate Job Spec" set to "true".
+     *
+     * @param out
+     *            PrintWriter for execution output.
+     * @param fmt
+     *            Output format for execution output.
+     */
+    public SessionConfig(PrintWriter out, OutputFormat fmt) {
+        this(out, fmt, null, null, true, true, true);
+    }
+
+    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
+            ResultDecorator postResultDecorator) {
+        this(out, fmt, preResultDecorator, postResultDecorator, true, true, true);
+    }
+
+    public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery,
+            boolean generateJobSpec) {
+        this(out, fmt, null, null, optimize, executeQuery, generateJobSpec);
+    }
+
+    /**
+     * Create a SessionConfig object with all optional values set to defaults:
+     * - All format flags set to "false".
+     * - All out-of-band outputs set to "false".
+     *
+     * @param out
+     *            PrintWriter for execution output.
+     * @param fmt
+     *            Output format for execution output.
+     * @param optimize
+     *            Whether to optimize the execution.
+     * @param executeQuery
+     *            Whether to execute the query or not.
+     * @param generateJobSpec
+     *            Whether to generate the Hyracks job specification (if
+     *            false, job cannot be executed).
+     */
+    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
+            ResultDecorator postResultDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
+        this.out = out;
+        this.fmt = fmt;
+        this.preResultDecorator = preResultDecorator;
+        this.postResultDecorator = postResultDecorator;
+        this.optimize = optimize;
+        this.executeQuery = executeQuery;
+        this.generateJobSpec = generateJobSpec;
+        this.flags = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the PrintWriter to produce output to.
+     */
+    public PrintWriter out() {
+        return this.out;
+    }
+
+    /**
+     * Retrieve the OutputFormat for this execution.
+     */
+    public OutputFormat fmt() {
+        return this.fmt;
+    }
+
+    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
+        return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
+    };
+
+    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
+        return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
+    };
+
+    /**
+     * Retrieve the value of the "execute query" flag.
+     */
+    public boolean isExecuteQuery() {
+        return executeQuery;
+    }
+
+    /**
+     * Retrieve the value of the "optimize" flag.
+     */
+    public boolean isOptimize() {
+        return optimize;
+    }
+
+    /**
+     * Retrieve the value of the "generate job spec" flag.
+     */
+    public boolean isGenerateJobSpec() {
+        return generateJobSpec;
+    }
+
+    /**
+     * Specify all out-of-band settings at once. For convenience of older code.
+     */
+    public void setOOBData(boolean exprTree, boolean rewrittenExprTree, boolean logicalPlan,
+            boolean optimizedLogicalPlan, boolean hyracksJob) {
+        this.set(OOB_EXPR_TREE, exprTree);
+        this.set(OOB_REWRITTEN_EXPR_TREE, rewrittenExprTree);
+        this.set(OOB_LOGICAL_PLAN, logicalPlan);
+        this.set(OOB_OPTIMIZED_LOGICAL_PLAN, optimizedLogicalPlan);
+        this.set(OOB_HYRACKS_JOB, hyracksJob);
+    }
+
+    /**
+     * Specify a flag.
+     *
+     * @param flag
+     *            One of the OOB_ or FORMAT_ constants from this class.
+     * @param value
+     *            Value for the flag (all flags default to "false").
+     */
+    public void set(String flag, boolean value) {
+        flags.put(flag, Boolean.valueOf(value));
+    }
+
+    /**
+     * Retrieve the setting of a format-specific flag.
+     *
+     * @param flag
+     *            One of the FORMAT_ constants from this class.
+     * @returns true or false (all flags default to "false").
+     */
+    public boolean is(String flag) {
+        Boolean value = flags.get(flag);
+        return value == null ? false : value.booleanValue();
+    }
+}


Mime
View raw message