asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [12/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:54 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
new file mode 100644
index 0000000..a4b2345
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import org.apache.asterix.common.api.IExtension;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+
+/**
+ * An interface for extensions of {@code IStatementExecutor}
+ */
+public interface IStatementExecutorExtension extends IExtension {
+    @Override
+    default ExtensionKind getExtensionKind() {
+        return ExtensionKind.STATEMENT_EXECUTOR;
+    }
+
+    /**
+     * @return The extension implementation of the {@code IStatementExecutorFactory}
+     */
+    IStatementExecutorFactory getQueryTranslatorFactory();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
index c44ffc1..fe15ce8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
@@ -26,7 +26,8 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -48,15 +49,17 @@ public class ActiveLifecycleListener implements IJobLifecycleListener {
 
     @Override
     public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
-        if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(jobId)) {
-            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_START));
+        EntityId entityId = ActiveJobNotificationHandler.INSTANCE.getEntity(jobId);
+        if (entityId != null) {
+            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_START, entityId));
         }
     }
 
     @Override
     public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
-        if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(jobId)) {
-            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_FINISH));
+        EntityId entityId = ActiveJobNotificationHandler.INSTANCE.getEntity(jobId);
+        if (entityId != null) {
+            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_FINISH, entityId));
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
@@ -72,7 +75,7 @@ public class ActiveLifecycleListener implements IJobLifecycleListener {
     public void receive(ActivePartitionMessage message) {
         if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(message.getJobId())) {
             jobEventInbox.add(new ActiveEvent(message.getJobId(), ActiveEvent.EventKind.PARTITION_EVENT,
-                    message.getFeedId(), message.getPayload()));
+                    message.getActiveRuntimeId().getEntityId(), message));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 92ef062..95fe68c 100755
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -265,8 +265,8 @@ public class ExternalLibraryUtils {
                     String adapterFactoryClass = adapter.getFactoryClass().trim();
                     String adapterName = libraryName + "#" + adapter.getName().trim();
                     AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
-                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass,
-                            IDataSourceAdapter.AdapterType.EXTERNAL);
+                    DatasourceAdapter dsa =
+                            new DatasourceAdapter(aid, adapterFactoryClass, IDataSourceAdapter.AdapterType.EXTERNAL);
                     MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
                     if (LOGGER.isLoggable(Level.INFO)) {
                         LOGGER.info("Installed adapter: " + adapterName);
@@ -334,8 +334,8 @@ public class ExternalLibraryUtils {
         }
 
         // get a reference to the specific library dir
-        File libDir = new File(
-                installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
+        File libDir =
+                new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
         FilenameFilter jarFileFilter = new FilenameFilter() {
             @Override
             public boolean accept(File dir, String name) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index ba15fb1..b815602 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -22,9 +22,10 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.common.app.SessionConfig.OutputFormat;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.feed.api.IFeedWork;
@@ -36,6 +37,7 @@ import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -68,6 +70,7 @@ public class FeedWorkCollection {
 
         private static class SubscribeFeedWorkRunnable implements Runnable {
 
+            private static final DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory(null);
             private final FeedConnectionRequest request;
             private final String[] locations;
 
@@ -82,13 +85,13 @@ public class FeedWorkCollection {
                     //TODO(amoudi): route PrintWriter to log file
                     PrintWriter writer = new PrintWriter(System.err, true);
                     SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-                    DataverseDecl dataverseDecl =
-                            new DataverseDecl(new Identifier(request.getReceivingFeedId().getDataverse()));
+                    DataverseDecl dataverseDecl = new DataverseDecl(
+                            new Identifier(request.getReceivingFeedId().getDataverse()));
                     SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request);
                     List<Statement> statements = new ArrayList<Statement>();
                     statements.add(dataverseDecl);
                     statements.add(subscribeStmt);
-                    QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+                    IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider);
                     translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
                             QueryTranslator.ResultDelivery.SYNC);
                     if (LOGGER.isEnabledFor(Level.INFO)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
new file mode 100644
index 0000000..3ebe873
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.api.common.AsterixAppRuntimeContextProviderForRecovery;
+import org.apache.asterix.common.api.AsterixThreadExecutor;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixBuildProperties;
+import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.common.config.AsterixExtensionProperties;
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.AsterixPropertiesAccessor;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.context.AsterixFileMapManager;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationChannel;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.IAsterixStateProxy;
+import org.apache.asterix.metadata.api.IMetadataNode;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.replication.management.ReplicationChannel;
+import org.apache.asterix.replication.management.ReplicationManager;
+import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
+import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
+import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
+import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.storage.common.file.IFileMapProvider;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+    private static final Logger LOGGER = Logger.getLogger(AsterixNCAppRuntimeContext.class.getName());
+
+    private ILSMMergePolicyFactory metadataMergePolicyFactory;
+    private final INCApplicationContext ncApplicationContext;
+
+    private AsterixCompilerProperties compilerProperties;
+    private AsterixExternalProperties externalProperties;
+    private AsterixMetadataProperties metadataProperties;
+    private AsterixStorageProperties storageProperties;
+    private AsterixTransactionProperties txnProperties;
+    private AsterixFeedProperties feedProperties;
+    private AsterixBuildProperties buildProperties;
+    private AsterixReplicationProperties replicationProperties;
+
+    private AsterixThreadExecutor threadExecutor;
+    private IDatasetLifecycleManager datasetLifecycleManager;
+    private IFileMapManager fileMapManager;
+    private IBufferCache bufferCache;
+    private ITransactionSubsystem txnSubsystem;
+
+    private ILSMIOOperationScheduler lsmIOScheduler;
+    private PersistentLocalResourceRepository localResourceRepository;
+    private IResourceIdFactory resourceIdFactory;
+    private IIOManager ioManager;
+    private boolean isShuttingdown;
+
+    private ActiveManager activeManager;
+
+    private IReplicationChannel replicationChannel;
+    private IReplicationManager replicationManager;
+    private IRemoteRecoveryManager remoteRecoveryManager;
+    private IReplicaResourcesManager replicaResourcesManager;
+    private final int metadataRmiPort;
+
+    private final ILibraryManager libraryManager;
+    private final NCExtensionManager ncExtensionManager;
+
+    public AsterixNCAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort)
+            throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
+            IOException {
+        this.ncApplicationContext = ncApplicationContext;
+        // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
+        // QQQ strip this out eventually
+        AsterixPropertiesAccessor propertiesAccessor;
+        IApplicationConfig cfg = ncApplicationContext.getAppConfig();
+        // QQQ this is NOT a good way to determine whether the config is valid
+        if (cfg.getString("cc", "cluster.address") != null) {
+            propertiesAccessor = new AsterixPropertiesAccessor(cfg);
+        } else {
+            propertiesAccessor = new AsterixPropertiesAccessor();
+        }
+        compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
+        externalProperties = new AsterixExternalProperties(propertiesAccessor);
+        metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
+        storageProperties = new AsterixStorageProperties(propertiesAccessor);
+        txnProperties = new AsterixTransactionProperties(propertiesAccessor);
+        feedProperties = new AsterixFeedProperties(propertiesAccessor);
+        buildProperties = new AsterixBuildProperties(propertiesAccessor);
+        replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
+                AsterixClusterProperties.INSTANCE.getCluster());
+        this.metadataRmiPort = metadataRmiPort;
+        libraryManager = new ExternalLibraryManager();
+        ncExtensionManager = new NCExtensionManager(
+                new AsterixExtensionProperties(propertiesAccessor).getExtensions());
+    }
+
+    @Override
+    public void initialize(boolean initialRun) throws IOException, ACIDException {
+        Logger.getLogger("org.apache").setLevel(externalProperties.getLogLevel());
+
+        threadExecutor = new AsterixThreadExecutor(ncApplicationContext.getThreadFactory());
+        fileMapManager = new AsterixFileMapManager();
+        ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+        IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
+        ioManager = ncApplicationContext.getRootContext().getIOManager();
+        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
+                storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
+
+        AsynchronousScheduler.INSTANCE.init(ncApplicationContext.getThreadFactory());
+        lsmIOScheduler = AsynchronousScheduler.INSTANCE;
+
+        metadataMergePolicyFactory = new PrefixMergePolicyFactory();
+
+        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
+                new PersistentLocalResourceRepositoryFactory(ioManager, ncApplicationContext.getNodeId(),
+                        metadataProperties);
+
+        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
+                .createRepository();
+
+        IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider =
+                new AsterixAppRuntimeContextProviderForRecovery(this);
+        txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
+                txnProperties);
+
+        IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
+        SystemState systemState = recoveryMgr.getSystemState();
+        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
+            //delete any storage data before the resource factory is initialized
+            localResourceRepository.deleteStorageData(true);
+        }
+        initializeResourceIdFactory();
+
+        datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(),
+                ioManager.getIODevices().size());
+
+        isShuttingdown = false;
+
+        activeManager = new ActiveManager(ncApplicationContext.getNodeId(),
+                feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
+
+        if (replicationProperties.isReplicationEnabled()) {
+            String nodeId = ncApplicationContext.getNodeId();
+
+            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
+
+            replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
+                    txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
+
+            //pass replication manager to replication required object
+            //LogManager to replicate logs
+            txnSubsystem.getLogManager().setReplicationManager(replicationManager);
+
+            //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
+            localResourceRepository.setReplicationManager(replicationManager);
+
+            /**
+             * add the partitions that will be replicated in this node as inactive partitions
+             */
+            //get nodes which replicate to this node
+            Set<String> replicationClients = replicationProperties.getNodeReplicationClients(nodeId);
+            //remove the node itself
+            replicationClients.remove(nodeId);
+            for (String clientId : replicationClients) {
+                //get the partitions of each client
+                ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId);
+                for (ClusterPartition partition : clientPartitions) {
+                    localResourceRepository.addInactivePartition(partition.getPartitionId());
+                }
+            }
+
+            //initialize replication channel
+            replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(),
+                    replicaResourcesManager, replicationManager, ncApplicationContext,
+                    asterixAppRuntimeContextProvider);
+
+            remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties);
+
+            bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
+                    storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory(),
+                    replicationManager);
+        } else {
+            bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
+                    storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
+        }
+
+        /*
+         * The order of registration is important. The buffer cache must registered before recovery and transaction
+         * managers. Notes: registered components are stopped in reversed order
+         */
+        ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
+        lccm.register((ILifeCycleComponent) bufferCache);
+        /**
+         * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager
+         * to process any logs that might be generated during stopping these components
+         */
+        lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+        /**
+         * ReplicationManager must be stopped after indexLifecycleManager and recovery manager
+         * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
+         */
+        if (replicationManager != null) {
+            lccm.register(replicationManager);
+        }
+        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
+        /**
+         * Stopping indexLifecycleManager will flush and close all datasets.
+         */
+        lccm.register((ILifeCycleComponent) datasetLifecycleManager);
+        lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+        lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+    }
+
+    @Override
+    public boolean isShuttingdown() {
+        return isShuttingdown;
+    }
+
+    @Override
+    public void setShuttingdown(boolean isShuttingdown) {
+        this.isShuttingdown = isShuttingdown;
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+    }
+
+    @Override
+    public IBufferCache getBufferCache() {
+        return bufferCache;
+    }
+
+    @Override
+    public IFileMapProvider getFileMapManager() {
+        return fileMapManager;
+    }
+
+    @Override
+    public ITransactionSubsystem getTransactionSubsystem() {
+        return txnSubsystem;
+    }
+
+    @Override
+    public IDatasetLifecycleManager getDatasetLifecycleManager() {
+        return datasetLifecycleManager;
+    }
+
+    @Override
+    public double getBloomFilterFalsePositiveRate() {
+        return storageProperties.getBloomFilterFalsePositiveRate();
+    }
+
+    @Override
+    public ILSMIOOperationScheduler getLSMIOScheduler() {
+        return lsmIOScheduler;
+    }
+
+    @Override
+    public ILocalResourceRepository getLocalResourceRepository() {
+        return localResourceRepository;
+    }
+
+    @Override
+    public IResourceIdFactory getResourceIdFactory() {
+        return resourceIdFactory;
+    }
+
+    @Override
+    public IIOManager getIOManager() {
+        return ioManager;
+    }
+
+    @Override
+    public AsterixStorageProperties getStorageProperties() {
+        return storageProperties;
+    }
+
+    @Override
+    public AsterixTransactionProperties getTransactionProperties() {
+        return txnProperties;
+    }
+
+    @Override
+    public AsterixCompilerProperties getCompilerProperties() {
+        return compilerProperties;
+    }
+
+    @Override
+    public AsterixMetadataProperties getMetadataProperties() {
+        return metadataProperties;
+    }
+
+    @Override
+    public AsterixExternalProperties getExternalProperties() {
+        return externalProperties;
+    }
+
+    @Override
+    public AsterixFeedProperties getFeedProperties() {
+        return feedProperties;
+    }
+
+    @Override
+    public AsterixBuildProperties getBuildProperties() {
+        return buildProperties;
+    }
+
+    @Override
+    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+        return datasetLifecycleManager.getOperationTracker(datasetID);
+    }
+
+    @Override
+    public AsterixThreadExecutor getThreadExecutor() {
+        return threadExecutor;
+    }
+
+    @Override
+    public ILSMMergePolicyFactory getMetadataMergePolicyFactory() {
+        return metadataMergePolicyFactory;
+    }
+
+    @Override
+    public ActiveManager getActiveManager() {
+        return activeManager;
+    }
+
+    @Override
+    public AsterixReplicationProperties getReplicationProperties() {
+        return replicationProperties;
+    }
+
+    @Override
+    public IReplicationChannel getReplicationChannel() {
+        return replicationChannel;
+    }
+
+    @Override
+    public IReplicaResourcesManager getReplicaResourcesManager() {
+        return replicaResourcesManager;
+    }
+
+    @Override
+    public IRemoteRecoveryManager getRemoteRecoveryManager() {
+        return remoteRecoveryManager;
+    }
+
+    @Override
+    public IReplicationManager getReplicationManager() {
+        return replicationManager;
+    }
+
+    @Override
+    public ILibraryManager getLibraryManager() {
+        return libraryManager;
+    }
+
+    @Override
+    public void initializeResourceIdFactory() throws HyracksDataException {
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
+    }
+
+    @Override
+    public void initializeMetadata(boolean newUniverse) throws Exception {
+        IAsterixStateProxy proxy;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Bootstrapping metadata");
+        }
+        MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(),
+                ncExtensionManager.getMetadataExtensions());
+
+        proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+        if (proxy == null) {
+            throw new IllegalStateException("Metadata node cannot access distributed state");
+        }
+
+        // This is a special case, we just give the metadataNode directly.
+        // This way we can delay the registration of the metadataNode until
+        // it is completely initialized.
+        MetadataManager.instantiate(new MetadataManager(proxy, MetadataNode.INSTANCE));
+        MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+        MetadataBootstrap.startDDLRecovery();
+        ncExtensionManager.initializeMetadata();
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Metadata node bound");
+        }
+    }
+
+    @Override
+    public void exportMetadataNodeStub() throws RemoteException {
+        IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+        ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+    }
+
+    @Override
+    public void unexportMetadataNodeStub() throws RemoteException {
+        UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+    }
+
+    public NCExtensionManager getNcExtensionManager() {
+        return ncExtensionManager;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java
new file mode 100644
index 0000000..759bf09
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.common.api.IExtension;
+import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.api.IMetadataExtension;
+import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * AsterixDB's implementation of {@code INCExtensionManager} which takes care of
+ * initializing extensions on Node Controllers
+ */
+public class NCExtensionManager {
+
+    private final MetadataTupleTranslatorProvider tupleTranslatorProvider;
+    private final List<IMetadataExtension> mdExtensions;
+
+    /**
+     * Initialize {@code CCExtensionManager} from configuration
+     *
+     * @param list
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     * @throws HyracksDataException
+     */
+    public NCExtensionManager(List<AsterixExtension> list)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException, HyracksDataException {
+        MetadataTupleTranslatorProvider ttp = null;
+        IMetadataExtension tupleTranslatorExtension = null;
+        mdExtensions = new ArrayList<>();
+        if (list != null) {
+            for (AsterixExtension extensionConf : list) {
+                IExtension extension = (IExtension) Class.forName(extensionConf.getClassName()).newInstance();
+                extension.configure(extensionConf.getArgs());
+                switch (extension.getExtensionKind()) {
+                    case METADATA:
+                        IMetadataExtension mde = (IMetadataExtension) extension;
+                        mdExtensions.add(mde);
+                        ttp = extendTupleTranslator(ttp, tupleTranslatorExtension, mde);
+                        tupleTranslatorExtension = ttp == null ? null : mde;
+                        break;
+                    default:
+                        break;
+                }
+            }
+        }
+        this.tupleTranslatorProvider = ttp == null ? new MetadataTupleTranslatorProvider() : ttp;
+    }
+
+    private MetadataTupleTranslatorProvider extendTupleTranslator(MetadataTupleTranslatorProvider ttp,
+            IMetadataExtension tupleTranslatorExtension, IMetadataExtension mde) throws HyracksDataException {
+        if (ttp != null) {
+            throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_EXTENSION_CONFLICT,
+                    CompilerExtensionManager.ERROR_MESSAGE_COMPONENT_CONFLICT, tupleTranslatorExtension.getId(),
+                    mde.getId(), IMetadataExtension.class.getSimpleName());
+        }
+        return mde.getMetadataTupleTranslatorProvider();
+    }
+
+    public List<IMetadataExtension> getMetadataExtensions() {
+        return mdExtensions;
+    }
+
+    public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() {
+        return tupleTranslatorProvider;
+    }
+
+    /**
+     * Called on bootstrap of metadata node allowing extensions to instantiate their Metadata artifacts
+     *
+     * @throws HyracksDataException
+     */
+    public void initializeMetadata() throws HyracksDataException {
+        if (mdExtensions != null) {
+            for (IMetadataExtension mdExtension : mdExtensions) {
+                try {
+                    mdExtension.initializeMetadata();
+                } catch (RemoteException | ACIDException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
new file mode 100644
index 0000000..9b1b32b
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.common.utils.JSONUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+
+public class ResultPrinter {
+
+    // TODO(tillw): Should this be static?
+    private static FrameManager resultDisplayFrameMgr = new FrameManager(ResultReader.FRAME_SIZE);
+
+    private final SessionConfig conf;
+    private final Stats stats;
+    private final ARecordType recordType;
+
+    private boolean indentJSON;
+    private boolean quoteRecord;
+
+    // Whether we are wrapping the output sequence in an array
+    private boolean wrapArray = false;
+    // Whether this is the first instance being output
+    private boolean notFirst = false;
+
+    public ResultPrinter(SessionConfig conf, Stats stats, ARecordType recordType) {
+        this.conf = conf;
+        this.stats = stats;
+        this.recordType = recordType;
+        this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
+        this.quoteRecord = conf.is(SessionConfig.FORMAT_QUOTE_RECORD);
+    }
+
+    private static void appendCSVHeader(Appendable app, ARecordType recordType) throws HyracksDataException {
+        try {
+            String[] fieldNames = recordType.getFieldNames();
+            boolean notfirst = false;
+            for (String name : fieldNames) {
+                if (notfirst) {
+                    app.append(',');
+                }
+                notfirst = true;
+                app.append('"').append(name.replace("\"", "\"\"")).append('"');
+            }
+            app.append("\r\n");
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void printPrefix() throws HyracksDataException {
+        // If we're outputting CSV with a header, the HTML header was already
+        // output by displayCSVHeader(), so skip it here
+        if (conf.is(SessionConfig.FORMAT_HTML)) {
+            conf.out().println("<h4>Results:</h4>");
+            conf.out().println("<pre>");
+        }
+
+        try {
+            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+        } catch (AlgebricksException e) {
+            throw new HyracksDataException(e);
+        }
+
+        if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
+            conf.out().print("[ ");
+            wrapArray = true;
+        }
+
+        if (conf.fmt() == SessionConfig.OutputFormat.CSV && conf.is(SessionConfig.FORMAT_CSV_HEADER)) {
+            if (recordType == null) {
+                throw new HyracksDataException("Cannot print CSV with header without specifying output-record-type");
+            }
+            if (quoteRecord) {
+                StringWriter sw = new StringWriter();
+                appendCSVHeader(sw, recordType);
+                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+                conf.out().print("\n");
+                notFirst = true;
+            } else {
+                appendCSVHeader(conf.out(), recordType);
+            }
+        }
+    }
+
+    private void printPostfix() throws HyracksDataException {
+        conf.out().flush();
+        if (wrapArray) {
+            conf.out().println(" ]");
+        }
+        try {
+            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+        } catch (AlgebricksException e) {
+            throw new HyracksDataException(e);
+        }
+        if (conf.is(SessionConfig.FORMAT_HTML)) {
+            conf.out().println("</pre>");
+        }
+    }
+
+    private void displayRecord(String result) {
+        String record = result;
+        if (indentJSON) {
+            // TODO(tillw): this is inefficient - do this during record generation
+            record = JSONUtil.indent(record, 2);
+        }
+        if (conf.fmt() == SessionConfig.OutputFormat.CSV) {
+            // TODO(tillw): this is inefficient as well
+            record = record + "\r\n";
+        }
+        if (quoteRecord) {
+            // TODO(tillw): this is inefficient as well
+            record = JSONUtil.quoteAndEscape(record);
+        }
+        conf.out().print(record);
+        stats.setCount(stats.getCount() + 1);
+        // TODO(tillw) fix this approximation
+        stats.setSize(stats.getSize() + record.length());
+    }
+
+    public void print(String record) throws HyracksDataException {
+        printPrefix();
+        // TODO(tillw) evil hack
+        quoteRecord = true;
+        displayRecord(record);
+        printPostfix();
+    }
+
+    public void print(ResultReader resultReader) throws HyracksDataException {
+        printPrefix();
+
+        final IFrameTupleAccessor fta = resultReader.getFrameTupleAccessor();
+        final IFrame frame = new VSizeFrame(resultDisplayFrameMgr);
+
+        while (resultReader.read(frame) > 0) {
+            final ByteBuffer frameBuffer = frame.getBuffer();
+            final byte[] frameBytes = frameBuffer.array();
+            fta.reset(frameBuffer);
+            final int last = fta.getTupleCount();
+            for (int tIndex = 0; tIndex < last; tIndex++) {
+                final int start = fta.getTupleStartOffset(tIndex);
+                int length = fta.getTupleEndOffset(tIndex) - start;
+                if (conf.fmt() == SessionConfig.OutputFormat.CSV
+                        && ((length > 0) && (frameBytes[start + length - 1] == '\n'))) {
+                    length--;
+                }
+                String result = new String(frameBytes, start, length, UTF_8);
+                if (wrapArray && notFirst) {
+                    conf.out().print(", ");
+                }
+                notFirst = true;
+                displayRecord(result);
+            }
+            frameBuffer.clear();
+        }
+
+        printPostfix();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
new file mode 100644
index 0000000..99235be
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result;
+
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+
+public class ResultReader {
+    private final IHyracksDataset hyracksDataset;
+
+    private IHyracksDatasetReader reader;
+
+    private IFrameTupleAccessor frameTupleAccessor;
+
+    // Number of parallel result reader buffers
+    public static final int NUM_READERS = 1;
+
+    public static final int FRAME_SIZE = AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize();
+
+    public ResultReader(IHyracksDataset hdc) {
+        hyracksDataset = hdc;
+    }
+
+    public void open(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
+        reader = hyracksDataset.createReader(jobId, resultSetId);
+        frameTupleAccessor = new ResultFrameTupleAccessor();
+    }
+
+    public Status getStatus() {
+        return reader.getResultStatus();
+    }
+
+    public int read(IFrame frame) throws HyracksDataException {
+        return reader.read(frame);
+    }
+
+    public IFrameTupleAccessor getFrameTupleAccessor() {
+        return frameTupleAccessor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
new file mode 100644
index 0000000..b5fd96e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.asterix.api.http.servlet.APIServlet;
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.http.ParseException;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class ResultUtil {
+    private static final Logger LOGGER = Logger.getLogger(ResultUtil.class.getName());
+    public static final Map<Character, String> HTML_ENTITIES = Collections.unmodifiableMap(Stream.of(
+            new AbstractMap.SimpleImmutableEntry<>('"', "&quot;"), new AbstractMap.SimpleImmutableEntry<>('&', "&amp;"),
+            new AbstractMap.SimpleImmutableEntry<>('<', "&lt;"), new AbstractMap.SimpleImmutableEntry<>('>', "&gt;"))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+
+    private ResultUtil() {
+
+    }
+
+    /**
+     * escapes html entities in aString
+     *
+     * @param aString
+     * @return escaped String
+     */
+    public static String escapeHTML(String aString) {
+        String escaped = aString;
+        for (Entry<Character, String> entry : HTML_ENTITIES.entrySet()) {
+            if (escaped.indexOf(entry.getKey()) >= 0) {
+                escaped = escaped.replace(entry.getKey().toString(), entry.getValue());
+            }
+        }
+        return escaped;
+    }
+
+    public static void displayResults(ResultReader resultReader, SessionConfig conf, Stats stats,
+            ARecordType recordType) throws HyracksDataException {
+        new ResultPrinter(conf, stats, recordType).print(resultReader);
+    }
+
+    public static void displayResults(String record, SessionConfig conf, Stats stats, ARecordType recordType)
+            throws HyracksDataException {
+        new ResultPrinter(conf, stats, recordType).print(record);
+    }
+
+    public static JSONObject getErrorResponse(int errorCode, String errorMessage, String errorSummary,
+            String errorStackTrace) {
+        JSONObject errorResp = new JSONObject();
+        JSONArray errorArray = new JSONArray();
+        errorArray.put(errorCode);
+        errorArray.put(errorMessage);
+        try {
+            errorResp.put("error-code", errorArray);
+            if (!"".equals(errorSummary)) {
+                errorResp.put("summary", errorSummary);
+            } else {
+                //parse exception
+                errorResp.put("summary", errorMessage);
+            }
+            errorResp.put("stacktrace", errorStackTrace);
+        } catch (JSONException e) {
+            LOGGER.warn("Failed to build the result's JSON object", e);
+            // TODO(madhusudancs): Figure out what to do when JSONException occurs while building the results.
+        }
+        return errorResp;
+    }
+
+    public static void webUIErrorHandler(PrintWriter out, Exception e) {
+        String errorTemplate = readTemplateFile("/webui/errortemplate.html", "%s\n%s\n%s");
+
+        String errorOutput = String.format(errorTemplate, escapeHTML(extractErrorMessage(e)),
+                escapeHTML(extractErrorSummary(e)), escapeHTML(extractFullStackTrace(e)));
+        out.println(errorOutput);
+    }
+
+    public static void webUIParseExceptionHandler(PrintWriter out, Throwable e, String query) {
+        String errorTemplate = readTemplateFile("/webui/errortemplate_message.html", "<pre class=\"error\">%s\n</pre>");
+
+        String errorOutput = String.format(errorTemplate, buildParseExceptionMessage(e, query));
+        out.println(errorOutput);
+    }
+
+    public static void apiErrorHandler(PrintWriter out, Exception e) {
+        int errorCode = 99;
+        if (e instanceof ParseException) {
+            errorCode = 2;
+        } else if (e instanceof AlgebricksException) {
+            errorCode = 3;
+        } else if (e instanceof HyracksDataException) {
+            errorCode = 4;
+        }
+
+        JSONObject errorResp = ResultUtil.getErrorResponse(errorCode, extractErrorMessage(e), extractErrorSummary(e),
+                extractFullStackTrace(e));
+        out.write(errorResp.toString());
+    }
+
+    public static String buildParseExceptionMessage(Throwable e, String query) {
+        StringBuilder errorMessage = new StringBuilder();
+        String message = e.getMessage();
+        message = message.replace("<", "&lt");
+        message = message.replace(">", "&gt");
+        errorMessage.append("Error: " + message + "\n");
+        int pos = message.indexOf("line");
+        if (pos > 0) {
+            Pattern p = Pattern.compile("\\d+");
+            Matcher m = p.matcher(message);
+            if (m.find(pos)) {
+                int lineNo = Integer.parseInt(message.substring(m.start(), m.end()));
+                String[] lines = query.split("\n");
+                if (lineNo > lines.length) {
+                    errorMessage.append("===> &ltBLANK LINE&gt \n");
+                } else {
+                    String line = lines[lineNo - 1];
+                    errorMessage.append("==> " + line);
+                }
+            }
+        }
+        return errorMessage.toString();
+    }
+
+    private static Throwable getRootCause(Throwable cause) {
+        Throwable currentCause = cause;
+        Throwable nextCause = cause.getCause();
+        while (nextCause != null && nextCause != currentCause) {
+            currentCause = nextCause;
+            nextCause = cause.getCause();
+        }
+        return currentCause;
+    }
+
+    /**
+     * Extract the message in the root cause of the stack trace:
+     *
+     * @param e
+     * @return error message string.
+     */
+    private static String extractErrorMessage(Throwable e) {
+        Throwable cause = getRootCause(e);
+        String fullyQualifiedExceptionClassName = cause.getClass().getName();
+        String[] hierarchySplits = fullyQualifiedExceptionClassName.split("\\.");
+        //try returning the class without package qualification
+        String exceptionClassName = hierarchySplits[hierarchySplits.length - 1];
+        String localizedMessage = cause.getLocalizedMessage();
+        if (localizedMessage == null) {
+            localizedMessage = "Internal error. Please check instance logs for further details.";
+        }
+        return localizedMessage + " [" + exceptionClassName + "]";
+    }
+
+    /**
+     * Extract the meaningful part of a stack trace:
+     * a. the causes in the stack trace hierarchy
+     * b. the top exception for each cause
+     *
+     * @param e
+     * @return the contacted message containing a and b.
+     */
+    private static String extractErrorSummary(Throwable e) {
+        StringBuilder errorMessageBuilder = new StringBuilder();
+        Throwable cause = e;
+        errorMessageBuilder.append(cause.getLocalizedMessage());
+        while (cause != null) {
+            StackTraceElement[] stackTraceElements = cause.getStackTrace();
+            errorMessageBuilder.append(stackTraceElements.length > 0 ? "\n caused by: " + stackTraceElements[0] : "");
+            cause = cause.getCause();
+        }
+        return errorMessageBuilder.toString();
+    }
+
+    /**
+     * Extract the full stack trace:
+     *
+     * @param e
+     * @return the string containing the full stack trace of the error.
+     */
+    public static String extractFullStackTrace(Throwable e) {
+        StringWriter stringWriter = new StringWriter();
+        PrintWriter printWriter = new PrintWriter(stringWriter);
+        e.printStackTrace(printWriter);
+        return stringWriter.toString();
+    }
+
+    /**
+     * Read the template file which is stored as a resource and return its content. If the file does not exist or is
+     * not readable return the default template string.
+     *
+     * @param path
+     *            The path to the resource template file
+     * @param defaultTemplate
+     *            The default template string if the template file does not exist or is not readable
+     * @return The template string to be used to render the output.
+     */
+    //TODO(till|amoudi|mblow|yingyi|ceej|imaxon): path is ignored completely!!
+    private static String readTemplateFile(String path, String defaultTemplate) {
+        String errorTemplate = defaultTemplate;
+        try {
+            String resourcePath = "/webui/errortemplate_message.html";
+            InputStream is = APIServlet.class.getResourceAsStream(resourcePath);
+            InputStreamReader isr = new InputStreamReader(is);
+            StringBuilder sb = new StringBuilder();
+            BufferedReader br = new BufferedReader(isr);
+            String line = br.readLine();
+
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+            }
+            errorTemplate = sb.toString();
+        } catch (IOException ioe) {
+            LOGGER.warn("Unable to read template error message file", ioe);
+        }
+        return errorTemplate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
new file mode 100644
index 0000000..c802d08
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.translator;
+
+import java.util.List;
+
+import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.common.app.SessionConfig;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+
+public class DefaultStatementExecutorFactory implements IStatementExecutorFactory {
+
+    protected final CompilerExtensionManager cExtensionManager;
+
+    public DefaultStatementExecutorFactory(CompilerExtensionManager cExtensionManager) {
+        this.cExtensionManager = cExtensionManager;
+    }
+
+    @Override
+    public QueryTranslator create(List<Statement> aqlStatements, SessionConfig conf,
+            ILangCompilationProvider compilationProvider) {
+        return new QueryTranslator(aqlStatements, conf, compilationProvider, cExtensionManager);
+    }
+
+}


Mime
View raw message