asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [2/2] asterixdb git commit: Enable HTTP API processing on NCs
Date Thu, 11 May 2017 20:44:00 GMT
Enable HTTP API processing on NCs

- Query/Status/Result are answered by NC nodes
- other HTTP requests are proxied to the CC node
- SessionConfig refactoring – split into config and output (SessionOutput)
- TestExecutor now can send http requests do multiple nodes (round robin)

Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1709
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4671f712
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4671f712
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4671f712

Branch: refs/heads/master
Commit: 4671f7127c484b805b22ad3ff95c08c47a031d12
Parents: 7a0ba78
Author: Till Westmann <tillw@apache.org>
Authored: Thu May 11 09:13:57 2017 -0700
Committer: Till Westmann <tillw@apache.org>
Committed: Thu May 11 13:43:30 2017 -0700

----------------------------------------------------------------------
 .../asterix/translator/IStatementExecutor.java  |  26 ++-
 .../translator/IStatementExecutorFactory.java   |   6 +-
 .../asterix/translator/SessionConfig.java       |  78 ++-------
 .../asterix/translator/SessionOutput.java       |  88 +++++++++++
 .../apache/asterix/api/common/APIFramework.java |  71 +++++----
 .../http/server/AbstractQueryApiServlet.java    |   6 +-
 .../asterix/api/http/server/ApiServlet.java     |   8 +-
 .../api/http/server/NCQueryServiceServlet.java  |  94 +++++++++++
 .../api/http/server/QueryResultApiServlet.java  |  10 +-
 .../api/http/server/QueryServiceServlet.java    | 115 +++++++-------
 .../api/http/server/QueryStatusApiServlet.java  |   4 +-
 .../asterix/api/http/server/RestApiServlet.java |  23 +--
 .../asterix/api/http/server/ResultUtil.java     |  55 +++++--
 .../api/http/servlet/ServletConstants.java      |   3 +-
 .../asterix/api/java/AsterixJavaClient.java     |   8 +-
 .../asterix/app/cc/CCExtensionManager.java      |  12 +-
 .../message/ExecuteStatementRequestMessage.java | 157 +++++++++++++++++++
 .../ExecuteStatementResponseMessage.java        |  86 ++++++++++
 .../asterix/app/result/ResultPrinter.java       |  37 +++--
 .../DefaultStatementExecutorFactory.java        |   6 +-
 .../asterix/app/translator/QueryTranslator.java |  72 +++++----
 .../hyracks/bootstrap/CCApplication.java        |  42 +++--
 .../hyracks/bootstrap/NCApplication.java        |  26 ++-
 .../asterix/messaging/NCMessageBroker.java      |  29 ++++
 .../apache/asterix/utils/FeedOperations.java    |  10 +-
 .../aql/translator/QueryTranslatorTest.java     |   6 +-
 .../asterix/test/common/TestExecutor.java       |  40 +++--
 .../async-deferred/AsyncDeferredQueries.xml     |  52 ++++++
 .../resources/runtimets/testsuite_sqlpp.xml     |  37 +----
 .../common/dataflow/ICcApplicationContext.java  |  20 +++
 .../common/messaging/api/INCMessageBroker.java  |  15 +-
 .../common/messaging/api/MessageFuture.java     |  40 +++++
 asterixdb/asterix-external-data/pom.xml         |   4 +
 .../installer/test/AsterixLifecycleIT.java      |   1 +
 .../runtime/utils/CcApplicationContext.java     |  12 +-
 .../src/main/assembly/filter.properties         |   3 +-
 .../src/main/opt/local/conf/cc.conf             |   1 +
 .../testframework/xml/TestSuiteParser.java      |  17 +-
 asterixdb/asterix-yarn/pom.xml                  |   6 -
 .../apache/hyracks/http/server/HttpServer.java  |  14 +-
 .../hyracks/http/server/HttpServerHandler.java  |  24 ++-
 .../http/server/HttpServerInitializer.java      |   2 +-
 42 files changed, 1008 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 92c487b..19f0dcc 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,17 +18,24 @@
  */
 package org.apache.asterix.translator;
 
+import java.io.Serializable;
 import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 /**
@@ -54,6 +61,16 @@ public interface IStatementExecutor {
         ASYNC
     }
 
+    class ResultMetadata implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets = new ArrayList<>();
+
+        public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
+            return resultSets;
+        }
+    }
+
     public static class Stats {
         private long count;
         private long size;
@@ -85,12 +102,14 @@ public interface IStatementExecutor {
      *            A Hyracks dataset client object that is used to read the results.
      * @param resultDelivery
      *            The {@code ResultDelivery} kind required for queries in the list of statements
+     * @param outMetadata
+     *            a reference to write the metadata of executed queries
      * @param stats
      *            a reference to write the stats of executed queries
      * @throws Exception
      */
     void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats) throws Exception;
+            ResultMetadata outMetadata, Stats stats) throws Exception;
 
     /**
      * Compiles and execute a list of statements, with passing in client context id and context.
@@ -101,6 +120,8 @@ public interface IStatementExecutor {
      *            A Hyracks dataset client object that is used to read the results.
      * @param resultDelivery
      *            The {@code ResultDelivery} kind required for queries in the list of statements
+     * @param outMetadata
+     *            a reference to write the metadata of executed queries
      * @param stats
      *            a reference to write the stats of executed queries
      * @param clientContextId
@@ -110,7 +131,8 @@ public interface IStatementExecutor {
      * @throws Exception
      */
     void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception;
+            ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+            throws Exception;
 
     /**
      * rewrites and compiles query into a hyracks job specifications

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
index 23365de..b244c0c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
@@ -37,14 +37,14 @@ public interface IStatementExecutorFactory {
      *
      * @param statements
      *            Statements to execute
-     * @param conf
-     *            request configuration
+     * @param output
+     *            output and request configuration
      * @param compilationProvider
      *            provides query language related components
      * @param storageComponentProvider
      *            provides storage related components
      * @return an implementation of {@code IStatementExecutor} thaxt is used to execute the passed list of statements
      */
-    IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
+    IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index a637e2f..cfd4e87 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -18,13 +18,10 @@
  */
 package org.apache.asterix.translator;
 
-import java.io.PrintWriter;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
-
 /**
  * SessionConfig captures several different parameters for controlling
  * the execution of an APIFramework call.
@@ -38,8 +35,10 @@ import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendab
  * execution output - LOSSLESS_JSON, CSV, etc.
  * <li>It allows you to specify output format-specific parameters.
  */
+public class SessionConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
-public class SessionConfig {
     /**
      * Used to specify the output format for the primary execution.
      */
@@ -105,53 +104,25 @@ public class SessionConfig {
      */
     public static final String FORMAT_QUOTE_RECORD = "quote-record";
 
-    @FunctionalInterface
-    public interface ResultDecorator {
-        AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
-    }
-
-    @FunctionalInterface
-    public interface ResultAppender {
-        AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
-    }
+    // Output format.
+    private final OutputFormat fmt;
 
     // Standard execution flags.
     private final boolean executeQuery;
     private final boolean generateJobSpec;
     private final boolean optimize;
 
-    // Output path for primary execution.
-    private final PrintWriter out;
-
-    // Output format.
-    private final OutputFormat fmt;
-
-    private final ResultDecorator preResultDecorator;
-    private final ResultDecorator postResultDecorator;
-    private final ResultAppender handleAppender;
-    private final ResultAppender statusAppender;
-
     // Flags.
     private final Map<String, Boolean> flags;
 
-    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
-            ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
-        this(out, fmt, preResultDecorator, postResultDecorator, handleAppender, statusAppender,
-                true, true, true);
-    }
-
-    public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery,
-            boolean generateJobSpec) {
-        this(out, fmt, null, null, null, null, optimize, executeQuery, generateJobSpec);
+    public SessionConfig(OutputFormat fmt) {
+        this(fmt, true, true, true);
     }
 
     /**
      * Create a SessionConfig object with all optional values set to defaults:
      * - All format flags set to "false".
      * - All out-of-band outputs set to "false".
-     *
-     * @param out
-     *            PrintWriter for execution output.
      * @param fmt
      *            Output format for execution output.
      * @param optimize
@@ -160,17 +131,9 @@ public class SessionConfig {
      *            Whether to execute the query or not.
      * @param generateJobSpec
      *            Whether to generate the Hyracks job specification (if
-     *            false, job cannot be executed).
      */
-    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
-            ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender,
-            boolean optimize, boolean executeQuery, boolean generateJobSpec) {
-        this.out = out;
+    public SessionConfig(OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
         this.fmt = fmt;
-        this.preResultDecorator = preResultDecorator;
-        this.postResultDecorator = postResultDecorator;
-        this.handleAppender = handleAppender;
-        this.statusAppender = statusAppender;
         this.optimize = optimize;
         this.executeQuery = executeQuery;
         this.generateJobSpec = generateJobSpec;
@@ -178,35 +141,12 @@ public class SessionConfig {
     }
 
     /**
-     * Retrieve the PrintWriter to produce output to.
-     */
-    public PrintWriter out() {
-        return this.out;
-    }
-
-    /**
      * Retrieve the OutputFormat for this execution.
      */
     public OutputFormat fmt() {
         return this.fmt;
     }
 
-    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
-        return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
-    }
-
-    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
-        return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
-    }
-
-    public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
-        return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
-    }
-
-    public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
-        return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
-    }
-
     /**
      * Retrieve the value of the "execute query" flag.
      */

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
new file mode 100644
index 0000000..b559df8
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import java.io.PrintWriter;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+
+public class SessionOutput {
+    private final SessionConfig config;
+
+    // Output path for primary execution.
+    private final PrintWriter out;
+
+    private final SessionOutput.ResultDecorator preResultDecorator;
+    private final SessionOutput.ResultDecorator postResultDecorator;
+    private final SessionOutput.ResultAppender handleAppender;
+    private final SessionOutput.ResultAppender statusAppender;
+
+    public SessionOutput(SessionConfig config, PrintWriter out) {
+        this(config, out, null, null, null, null);
+    }
+
+    public SessionOutput(SessionConfig config, PrintWriter out, ResultDecorator preResultDecorator,
+            ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) {
+        this.config = config;
+        this.out = out;
+        this.preResultDecorator = preResultDecorator;
+        this.postResultDecorator = postResultDecorator;
+        this.handleAppender = handleAppender;
+        this.statusAppender = statusAppender;
+    }
+
+    /**
+     * Retrieve the PrintWriter to produce output to.
+     */
+    public PrintWriter out() {
+        return this.out;
+    }
+
+    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
+        return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app;
+    }
+
+    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException {
+        return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app;
+    }
+
+    public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException {
+        return this.handleAppender != null ? this.handleAppender.append(app, handle) : app;
+    }
+
+    public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException {
+        return this.statusAppender != null ? this.statusAppender.append(app, status) : app;
+    }
+
+    public SessionConfig config() {
+        return config;
+    }
+
+    @FunctionalInterface
+    public interface ResultDecorator {
+        AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException;
+    }
+
+    @FunctionalInterface
+    public interface ResultAppender {
+        AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b12935d..583302b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -67,6 +67,7 @@ import org.apache.asterix.transaction.management.service.transaction.JobIdFactor
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.utils.ResourceUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -152,31 +153,33 @@ public class APIFramework {
         }
     }
 
-    private void printPlanPrefix(SessionConfig conf, String planName) {
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>" + planName + ":</h4>");
-            conf.out().println("<pre>");
+    private void printPlanPrefix(SessionOutput output, String planName) {
+        if (output.config().is(SessionConfig.FORMAT_HTML)) {
+            output.out().println("<h4>" + planName + ":</h4>");
+            output.out().println("<pre>");
         } else {
-            conf.out().println("----------" + planName + ":");
+            output.out().println("----------" + planName + ":");
         }
     }
 
-    private void printPlanPostfix(SessionConfig conf) {
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
+    private void printPlanPostfix(SessionOutput output) {
+        if (output.config().is(SessionConfig.FORMAT_HTML)) {
+            output.out().println("</pre>");
         }
     }
 
     public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions,
-            MetadataProvider metadataProvider, IReturningStatement q, SessionConfig conf) throws CompilationException {
+            MetadataProvider metadataProvider, IReturningStatement q, SessionOutput output)
+            throws CompilationException {
         if (q == null) {
             return null;
         }
+        SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_EXPR_TREE)) {
-            conf.out().println();
-            printPlanPrefix(conf, "Expression tree");
-            q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
-            printPlanPostfix(conf);
+            output.out().println();
+            printPlanPrefix(output, "Expression tree");
+            q.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
+            printPlanPostfix(output);
         }
         IQueryRewriter rw = rewriterFactory.createQueryRewriter();
         rw.rewrite(declaredFunctions, q, metadataProvider, new LangRewritingContext(q.getVarCounter()));
@@ -184,17 +187,18 @@ public class APIFramework {
     }
 
     public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
-            Query rwQ, int varCounter, String outputDatasetName, SessionConfig conf, ICompiledDmlStatement statement)
+            Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement)
             throws AlgebricksException, RemoteException, ACIDException {
 
+        SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
-            conf.out().println();
+            output.out().println();
 
-            printPlanPrefix(conf, "Rewritten expression tree");
+            printPlanPrefix(output, "Rewritten expression tree");
             if (rwQ != null) {
-                rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
+                rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
 
         org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
@@ -211,14 +215,14 @@ public class APIFramework {
         }
 
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
-            conf.out().println();
+            output.out().println();
 
-            printPlanPrefix(conf, "Logical plan");
+            printPlanPrefix(output, "Logical plan");
             if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
-                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
+                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(output.out());
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
         CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
@@ -264,15 +268,16 @@ public class APIFramework {
             if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
                 if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
                     // For Optimizer tests.
-                    AlgebricksAppendable buffer = new AlgebricksAppendable(conf.out());
+                    AlgebricksAppendable buffer = new AlgebricksAppendable(output.out());
                     PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
                 } else {
-                    printPlanPrefix(conf, "Optimized logical plan");
+                    printPlanPrefix(output, "Optimized logical plan");
                     if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
-                        LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
+                        LogicalOperatorPrettyPrintVisitor pvisitor =
+                                new LogicalOperatorPrettyPrintVisitor(output.out());
                         PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
                     }
-                    printPlanPostfix(conf);
+                    printPlanPostfix(output);
                 }
             }
         }
@@ -280,7 +285,7 @@ public class APIFramework {
             try {
                 LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
-                ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), conf,
+                ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), output,
                         new Stats(), null);
                 return null;
             } catch (IOException e) {
@@ -336,17 +341,17 @@ public class APIFramework {
         }
 
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
-            printPlanPrefix(conf, "Hyracks job");
+            printPlanPrefix(output, "Hyracks job");
             if (rwQ != null) {
                 try {
-                    conf.out().println(
+                    output.out().println(
                             new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
                 } catch (IOException e) {
                     throw new AlgebricksException(e);
                 }
-                conf.out().println(spec.getUserConstraints());
+                output.out().println(spec.getUserConstraints());
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
         return spec;
     }
@@ -459,9 +464,7 @@ public class APIFramework {
 
     // Gets the frame limit.
     private static int getFrameLimit(String parameterName, String parameter, long memBudgetInConfiguration,
-            int frameSize,
-            int minFrameLimit)
-            throws AlgebricksException {
+            int frameSize, int minFrameLimit) throws AlgebricksException {
         IOptionType<Long> longBytePropertyInterpreter = OptionTypes.LONG_BYTE_UNIT;
         long memBudget = parameter == null ? memBudgetInConfiguration : longBytePropertyInterpreter.parse(parameter);
         int frameLimit = (int) (memBudget / frameSize);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a9c4b39..a4e72f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -26,7 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -35,7 +35,7 @@ import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.http.server.AbstractServlet;
 
 public class AbstractQueryApiServlet extends AbstractServlet {
-    protected final ICcApplicationContext appCtx;
+    protected final IApplicationContext appCtx;
 
     public enum ResultFields {
         REQUEST_ID("requestID"),
@@ -93,7 +93,7 @@ public class AbstractQueryApiServlet extends AbstractServlet {
         }
     }
 
-    AbstractQueryApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
+    AbstractQueryApiServlet(IApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
         this.appCtx = appCtx;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 3384332..7874aa3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -50,6 +50,7 @@ import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -137,19 +138,20 @@ public class ApiServlet extends AbstractServlet {
             }
             IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
-            SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
+            SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true);
             sessionConfig.set(SessionConfig.FORMAT_HTML, true);
             sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
             sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray));
             sessionConfig.setOOBData(isSet(printExprParam), isSet(printRewrittenExprParam),
                     isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), isSet(printJob));
+            SessionOutput sessionOutput = new SessionOutput(sessionConfig, out);
             MetadataManager.INSTANCE.init();
-            IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionConfig,
+            IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
             translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE,
-                    new IStatementExecutor.Stats());
+                    null, new IStatementExecutor.Stats());
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
new file mode 100644
index 0000000..2b70685
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.server;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
+import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * Query service servlet that can run on NC nodes.
+ * Delegates query execution to CC, then serves the result.
+ */
+public class NCQueryServiceServlet extends QueryServiceServlet {
+    public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+            ILangExtension.Language queryLanguage) {
+        super(ctx, paths, appCtx, queryLanguage, null, null, null);
+    }
+
+    @Override
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+            IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+            String handleUrl, long[] outExecStartEnd) throws Exception {
+        // Running on NC -> send 'execute' message to CC
+        INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
+        INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
+        IStatementExecutor.ResultDelivery ccDelivery = delivery == IStatementExecutor.ResultDelivery.IMMEDIATE
+                ? IStatementExecutor.ResultDelivery.DEFERRED : delivery;
+        ExecuteStatementResponseMessage responseMsg;
+        MessageFuture responseFuture = ncMb.registerMessageFuture();
+        try {
+            ExecuteStatementRequestMessage requestMsg =
+                    new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
+                            statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl);
+            outExecStartEnd[0] = System.nanoTime();
+            ncMb.sendMessageToCC(requestMsg);
+            responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
+                    ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS);
+            outExecStartEnd[1] = System.nanoTime();
+        } finally {
+            ncMb.deregisterMessageFuture(responseFuture.getFutureId());
+        }
+
+        Throwable err = responseMsg.getError();
+        if (err != null) {
+            if (err instanceof Error) {
+                throw (Error) err;
+            } else if (err instanceof Exception) {
+                throw (Exception) err;
+            } else {
+                throw new Exception(err.toString(), err);
+            }
+        }
+
+        IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
+        if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
+            for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) {
+                ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+                ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight());
+            }
+        } else {
+            sessionOutput.out().append(responseMsg.getResult());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 401f55e..42e23ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -25,9 +25,9 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -41,7 +41,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 public class QueryResultApiServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = Logger.getLogger(QueryResultApiServlet.class.getName());
 
-    public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+    public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) {
         super(appCtx, ctx, paths);
     }
 
@@ -94,8 +94,8 @@ public class QueryResultApiServlet extends AbstractQueryApiServlet {
             // way to send the same OutputFormat value here as was
             // originally determined there. Need to save this value on
             // some object that we can obtain here.
-            SessionConfig sessionConfig = RestApiServlet.initResponse(request, response);
-            ResultUtil.printResults(appCtx, resultReader, sessionConfig, new Stats(), null);
+            SessionOutput sessionOutput = RestApiServlet.initResponse(request, response);
+            ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
         } catch (HyracksDataException e) {
             final int errorCode = e.getErrorCode();
             if (ErrorCode.NO_RESULTSET == errorCode) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 20bffc4..c45f24a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -27,8 +27,9 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -46,8 +47,8 @@ import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -66,19 +67,23 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class QueryServiceServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = Logger.getLogger(QueryServiceServlet.class.getName());
+    protected final ILangExtension.Language queryLanguage;
     private final ILangCompilationProvider compilationProvider;
     private final IStatementExecutorFactory statementExecutorFactory;
     private final IStorageComponentProvider componentProvider;
-    private final IStatementExecutorContext queryCtx = new StatementExecutorContext();
+    private final IStatementExecutorContext queryCtx;
+    protected final IServiceContext serviceCtx;
 
-    public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
-            ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
-            IStorageComponentProvider componentProvider) {
+    public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+            ILangExtension.Language queryLanguage, ILangCompilationProvider compilationProvider,
+            IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) {
         super(appCtx, ctx, paths);
+        this.queryLanguage = queryLanguage;
         this.compilationProvider = compilationProvider;
         this.statementExecutorFactory = statementExecutorFactory;
         this.componentProvider = componentProvider;
-        ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
+        this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+        this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
     }
 
     @Override
@@ -235,40 +240,22 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         return SessionConfig.OutputFormat.CLEAN_JSON;
     }
 
-    private static SessionConfig createSessionConfig(RequestParameters param, String handleUrl,
+    private static SessionOutput createSessionOutput(RequestParameters param, String handleUrl,
             PrintWriter resultWriter) {
-        SessionConfig.ResultDecorator resultPrefix = new SessionConfig.ResultDecorator() {
-            int resultNo = -1;
-
-            @Override
-            public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
-                app.append("\t\"");
-                app.append(ResultFields.RESULTS.str());
-                if (resultNo >= 0) {
-                    app.append('-').append(String.valueOf(resultNo));
-                }
-                ++resultNo;
-                app.append("\": ");
-                return app;
-            }
-        };
-
-        SessionConfig.ResultDecorator resultPostfix = app -> app.append("\t,\n");
-        SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("\t\"")
-                .append(ResultFields.HANDLE.str()).append("\": \"").append(handleUrl).append(handle).append("\",\n");
-        SessionConfig.ResultAppender appendStatus = (app, status) -> app.append("\t\"")
-                .append(ResultFields.STATUS.str()).append("\": \"").append(status).append("\",\n");
+        SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+        SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+        SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+        SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
 
         SessionConfig.OutputFormat format = getFormat(param.format);
-        SessionConfig sessionConfig =
-                new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, appendHandle, appendStatus);
+        SessionConfig sessionConfig = new SessionConfig(format);
         sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
         sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
         sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
                 format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON);
         sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV
                 && "present".equals(getParameterValue(param.format, Attribute.HEADER.str())));
-        return sessionConfig;
+        return new SessionOutput(sessionConfig, resultWriter, resultPrefix, resultPostfix, appendHandle, appendStatus);
     }
 
     private static void printClientContextID(PrintWriter pw, RequestParameters params) {
@@ -406,13 +393,13 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         ResultDelivery delivery = parseResultDelivery(param.mode);
 
         String handleUrl = getHandleUrl(param.host, param.path, delivery);
-        SessionConfig sessionConfig = createSessionConfig(param, handleUrl, resultWriter);
+        SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter);
+        SessionConfig sessionConfig = sessionOutput.config();
         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
         HttpResponseStatus status = HttpResponseStatus.OK;
         Stats stats = new Stats();
-        long execStart = -1;
-        long execEnd = -1;
+        long[] execStartEnd = new long[] { -1, -1 };
 
         resultWriter.print("{\n");
         printRequestId(resultWriter);
@@ -420,46 +407,33 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         printSignature(resultWriter);
         printType(resultWriter, sessionConfig);
         try {
-            final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
-            if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
-                // using a plain IllegalStateException here to get into the right catch clause for a 500
-                throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
-            }
             if (param.statement == null || param.statement.isEmpty()) {
                 throw new AsterixException("Empty request, no statement provided");
             }
-            IParser parser = compilationProvider.getParserFactory().createParser(param.statement + ";");
-            List<Statement> statements = parser.parse();
-            MetadataManager.INSTANCE.init();
-            IStatementExecutor translator =
-                    statementExecutorFactory.create(appCtx, statements, sessionConfig, compilationProvider,
-                            componentProvider);
-            execStart = System.nanoTime();
-            translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats,
-                    param.clientContextID, queryCtx);
-            execEnd = System.nanoTime();
+            String statementsText = param.statement + ";";
+            executeStatement(statementsText, sessionOutput, delivery, stats, param, handleUrl, execStartEnd);
             if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
-                ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS);
+                ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
             }
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             ResultUtil.printError(resultWriter, pe);
-            ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
             status = HttpResponseStatus.BAD_REQUEST;
         } catch (Exception e) {
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.toString(), e);
             ResultUtil.printError(resultWriter, e);
-            ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
             status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
         } finally {
-            if (execStart == -1) {
-                execEnd = -1;
-            } else if (execEnd == -1) {
-                execEnd = System.nanoTime();
+            if (execStartEnd[0] == -1) {
+                execStartEnd[1] = -1;
+            } else if (execStartEnd[1] == -1) {
+                execStartEnd[1] = System.nanoTime();
             }
         }
-        printMetrics(resultWriter, System.nanoTime() - elapsedStart, execEnd - execStart, stats.getCount(),
-                stats.getSize());
+        printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
+                stats.getCount(), stats.getSize());
         resultWriter.print("}\n");
         resultWriter.flush();
         String result = stringWriter.toString();
@@ -472,4 +446,23 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
             LOGGER.warning("Error flushing output writer");
         }
     }
+
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
+            IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd)
+            throws Exception {
+        IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+            // using a plain IllegalStateException here to get into the right catch clause for a 500
+            throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
+        }
+        IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+        List<Statement> statements = parser.parse();
+        MetadataManager.INSTANCE.init();
+        IStatementExecutor translator = statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
+                sessionOutput, compilationProvider, componentProvider);
+        outExecStartEnd[0] = System.nanoTime();
+        translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, null, stats,
+                param.clientContextID, queryCtx);
+        outExecStartEnd[1] = System.nanoTime();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index d0c574e..71dddc0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -29,7 +29,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -41,7 +41,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 public class QueryStatusApiServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = Logger.getLogger(QueryStatusApiServlet.class.getName());
 
-    public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+    public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) {
         super(appCtx, ctx, paths);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index e339ba9..6b1e408 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -45,6 +45,7 @@ import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -84,7 +85,7 @@ public abstract class RestApiServlet extends AbstractServlet {
      * SessionConfig with the appropriate output writer and output-format
      * based on the Accept: header and other servlet parameters.
      */
-    static SessionConfig initResponse(IServletRequest request, IServletResponse response) throws IOException {
+    static SessionOutput initResponse(IServletRequest request, IServletResponse response) throws IOException {
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8);
         // CLEAN_JSON output is the default; most generally useful for a
         // programmatic HTTP API
@@ -114,9 +115,9 @@ public abstract class RestApiServlet extends AbstractServlet {
             format = OutputFormat.LOSSLESS_JSON;
         }
 
-        SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
+        SessionOutput.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle")
                 .append("\":" + " \"").append(handle).append("\" }");
-        SessionConfig sessionConfig = new SessionConfig(response.writer(), format, null, null, appendHandle, null);
+        SessionConfig sessionConfig = new SessionConfig(format);
 
         // If it's JSON or ADM, check for the "wrapper-array" flag. Default is
         // "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -152,7 +153,7 @@ public abstract class RestApiServlet extends AbstractServlet {
             default:
                 throw new IOException("Unknown format " + format);
         }
-        return sessionConfig;
+        return new SessionOutput(sessionConfig, response.writer(), null, null, appendHandle, null);
     }
 
     @Override
@@ -171,9 +172,9 @@ public abstract class RestApiServlet extends AbstractServlet {
             // enable cross-origin resource sharing
             response.setHeader("Access-Control-Allow-Origin", "*");
             response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
-            SessionConfig sessionConfig = initResponse(request, response);
+            SessionOutput sessionOutput = initResponse(request, response);
             QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
-            doHandle(response, query, sessionConfig, resultDelivery);
+            doHandle(response, query, sessionOutput, resultDelivery);
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +182,7 @@ public abstract class RestApiServlet extends AbstractServlet {
         }
     }
 
-    private void doHandle(IServletResponse response, String query, SessionConfig sessionConfig,
+    private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput,
             ResultDelivery resultDelivery) throws JsonProcessingException {
         try {
             response.setStatus(HttpResponseStatus.OK);
@@ -201,20 +202,20 @@ public abstract class RestApiServlet extends AbstractServlet {
             List<Statement> aqlStatements = parser.parse();
             validate(aqlStatements);
             MetadataManager.INSTANCE.init();
-            IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig,
+            IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
-            translator.compileAndExecute(hcc, hds, resultDelivery, new IStatementExecutor.Stats());
+            translator.compileAndExecute(hcc, hds, resultDelivery, null, new IStatementExecutor.Stats());
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query);
             ObjectNode errorResp =
                     ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe));
-            sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp));
+            sessionOutput.out().write(new ObjectMapper().writeValueAsString(errorResp));
         } catch (Exception e) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-            ResultUtil.apiErrorHandler(sessionConfig.out(), e);
+            ResultUtil.apiErrorHandler(sessionOutput.out(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index fe6fa89..3bcd670 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -36,10 +36,10 @@ import java.util.stream.Stream;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultPrinter;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.http.ParseException;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -77,29 +77,29 @@ public class ResultUtil {
         return escaped;
     }
 
-    public static void printResults(ICcApplicationContext appCtx, ResultReader resultReader, SessionConfig conf,
+    public static void printResults(IApplicationContext appCtx, ResultReader resultReader, SessionOutput output,
             Stats stats, ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader);
+        new ResultPrinter(appCtx, output, stats, recordType).print(resultReader);
     }
 
-    public static void printResults(ICcApplicationContext appCtx, String record, SessionConfig conf, Stats stats,
+    public static void printResults(IApplicationContext appCtx, String record, SessionOutput output, Stats stats,
             ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(appCtx, conf, stats, recordType).print(record);
+        new ResultPrinter(appCtx, output, stats, recordType).print(record);
     }
 
-    public static void printResultHandle(SessionConfig conf, ResultHandle handle) throws HyracksDataException {
+    public static void printResultHandle(SessionOutput output, ResultHandle handle) throws HyracksDataException {
         try {
-            final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
-            conf.appendHandle(app, handle.toString());
+            final AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+            output.appendHandle(app, handle.toString());
         } catch (AlgebricksException e) {
             LOGGER.warn("error printing handle", e);
         }
     }
 
-    public static void printStatus(SessionConfig conf, AbstractQueryApiServlet.ResultStatus rs) {
+    public static void printStatus(SessionOutput output, AbstractQueryApiServlet.ResultStatus rs) {
         try {
-            final AlgebricksAppendable app = new AlgebricksAppendable(conf.out());
-            conf.appendStatus(app, rs.str());
+            final AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+            output.appendStatus(app, rs.str());
         } catch (AlgebricksException e) {
             LOGGER.warn("error printing status", e);
         }
@@ -318,4 +318,35 @@ public class ResultUtil {
         return errorTemplate;
     }
 
+    public static SessionOutput.ResultDecorator createPreResultDecorator() {
+        return new SessionOutput.ResultDecorator() {
+            int resultNo = -1;
+
+            @Override
+            public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException {
+                app.append("\t\"");
+                app.append(AbstractQueryApiServlet.ResultFields.RESULTS.str());
+                if (resultNo >= 0) {
+                    app.append('-').append(String.valueOf(resultNo));
+                }
+                ++resultNo;
+                app.append("\": ");
+                return app;
+            }
+        };
+    }
+
+    public static SessionOutput.ResultDecorator createPostResultDecorator() {
+        return app -> app.append("\t,\n");
+    }
+
+    public static SessionOutput.ResultAppender createResultHandleAppender(String handleUrl) {
+        return (app, handle) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.HANDLE.str())
+                .append("\": \"").append(handleUrl).append(handle).append("\",\n");
+    }
+
+    public static SessionOutput.ResultAppender createResultStatusAppender() {
+        return (app, status) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.STATUS.str())
+                .append("\": \"").append(status).append("\",\n");
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
index a9d4e22..b815d76 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
@@ -22,8 +22,9 @@ public class ServletConstants {
     public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
     public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
     public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO";
-    public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE_ATTR";
+    public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE";
     public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
+    public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT";
 
     private ServletConstants() {
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index ecf2c53..a9d24b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -36,6 +36,7 @@ import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -99,16 +100,17 @@ public class AsterixJavaClient {
         List<Statement> statements = parser.parse();
         MetadataManager.INSTANCE.init();
 
-        SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM, optimize, true, generateBinaryRuntime);
+        SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, true, generateBinaryRuntime);
         conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, printOptimizedPlan, printJob);
         if (printPhysicalOpsOnly) {
             conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
         }
+        SessionOutput output = new SessionOutput(conf, writer);
 
-        IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, conf, compilationProvider,
+        IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
         translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE,
-                new IStatementExecutor.Stats());
+                null, new IStatementExecutor.Stats());
         writer.flush();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 6c6f2af..0c6b2cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -110,11 +110,11 @@ public class CCExtensionManager implements IAlgebraExtensionManager {
         return statementExecutorFactory;
     }
 
-    public ILangCompilationProvider getAqlCompilationProvider() {
-        return aqlCompilationProvider;
-    }
-
-    public ILangCompilationProvider getSqlppCompilationProvider() {
-        return sqlppCompilationProvider;
+    public ILangCompilationProvider getCompilationProvider(Language lang) {
+        switch (lang) {
+            case AQL: return aqlCompilationProvider;
+            case SQLPP: return sqlppCompilationProvider;
+            default: throw new IllegalArgumentException(String.valueOf(lang));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
new file mode 100644
index 0000000..defa180
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
+public final class ExecuteStatementRequestMessage implements ICcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
+
+    private final String requestNodeId;
+
+    private final long requestMessageId;
+
+    private final ILangExtension.Language lang;
+
+    private final String statementsText;
+
+    private final SessionConfig sessionConfig;
+
+    private final IStatementExecutor.ResultDelivery delivery;
+
+    private final String clientContextID;
+
+    private final String handleUrl;
+
+    public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
+            String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery,
+            String clientContextID, String handleUrl) {
+        this.requestNodeId = requestNodeId;
+        this.requestMessageId = requestMessageId;
+        this.lang = lang;
+        this.statementsText = statementsText;
+        this.sessionConfig = sessionConfig;
+        this.delivery = delivery;
+        this.clientContextID = clientContextID;
+        this.handleUrl = handleUrl;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext ccAppCtx) throws HyracksDataException, InterruptedException {
+        ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+        ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
+        CCApplication ccApp = (CCApplication) ccSrv.getApplication();
+        CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+        CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
+        ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
+        IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
+        IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
+        IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
+
+        ccSrv.getExecutor().submit(() -> {
+            ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+
+            try {
+                final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+                if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+                    throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
+                }
+
+                IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
+                List<Statement> statements = parser.parse();
+
+                StringWriter outWriter = new StringWriter(256);
+                PrintWriter outPrinter = new PrintWriter(outWriter);
+                SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
+                SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator();
+                SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl);
+                SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender();
+                SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix,
+                        appendHandle, appendStatus);
+
+                IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata();
+
+                MetadataManager.INSTANCE.init();
+                IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+                        compilationProvider, storageComponentProvider);
+                translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata,
+                        new IStatementExecutor.Stats(), clientContextID, statementExecutorContext);
+
+                outPrinter.close();
+                responseMsg.setResult(outWriter.toString());
+                responseMsg.setMetadata(outMetadata);
+            } catch (TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+                responseMsg.setError(pe);
+            } catch (AsterixException pe) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+                responseMsg.setError(new AsterixException(pe.getMessage()));
+            } catch (Exception e) {
+                String estr = e.toString();
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, estr, e);
+                responseMsg.setError(new Exception(estr));
+            }
+
+            try {
+                messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, e.toString(), e);
+            }
+        });
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
+                statementsText);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
new file mode 100644
index 0000000..4f9aa0c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class ExecuteStatementResponseMessage implements INcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+
+    public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
+
+    private final long requestMessageId;
+
+    private String result;
+
+    private IStatementExecutor.ResultMetadata metadata;
+
+    private Throwable error;
+
+    public ExecuteStatementResponseMessage(long requestMessageId) {
+        this.requestMessageId = requestMessageId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(requestMessageId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public Throwable getError() {
+        return error;
+    }
+
+    public void setError(Throwable error) {
+        this.error = error;
+    }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
+
+    public IStatementExecutor.ResultMetadata getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata(IStatementExecutor.ResultMetadata metadata) {
+        this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId,
+                result != null ? result.length() : 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 7ed3aef..452d13e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -24,10 +24,11 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
 import org.apache.hyracks.api.comm.IFrame;
@@ -47,6 +48,7 @@ public class ResultPrinter {
 
     private final FrameManager resultDisplayFrameMgr;
 
+    private final SessionOutput output;
     private final SessionConfig conf;
     private final Stats stats;
     private final ARecordType recordType;
@@ -62,8 +64,9 @@ public class ResultPrinter {
     private ObjectMapper om;
     private ObjectWriter ow;
 
-    public ResultPrinter(ICcApplicationContext appCtx, SessionConfig conf, Stats stats, ARecordType recordType) {
-        this.conf = conf;
+    public ResultPrinter(IApplicationContext appCtx, SessionOutput output, Stats stats, ARecordType recordType) {
+        this.output = output;
+        this.conf = output.config();
         this.stats = stats;
         this.recordType = recordType;
         this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
@@ -112,18 +115,18 @@ public class ResultPrinter {
         // If we're outputting CSV with a header, the HTML header was already
         // output by displayCSVHeader(), so skip it here
         if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>Results:</h4>");
-            conf.out().println("<pre class=\"result-content\">");
+            output.out().println("<h4>Results:</h4>");
+            output.out().println("<pre class=\"result-content\">");
         }
 
         try {
-            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+            output.resultPrefix(new AlgebricksAppendable(output.out()));
         } catch (AlgebricksException e) {
             throw new HyracksDataException(e);
         }
 
         if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
-            conf.out().print("[ ");
+            output.out().print("[ ");
             wrapArray = true;
         }
 
@@ -134,29 +137,29 @@ public class ResultPrinter {
             if (quoteRecord) {
                 StringWriter sw = new StringWriter();
                 appendCSVHeader(sw, recordType);
-                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
-                conf.out().print("\n");
+                output.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+                output.out().print("\n");
                 notFirst = true;
             } else {
-                appendCSVHeader(conf.out(), recordType);
+                appendCSVHeader(output.out(), recordType);
             }
         }
     }
 
     private void printPostfix() throws HyracksDataException {
-        conf.out().flush();
+        output.out().flush();
         if (wrapArray) {
-            conf.out().println(" ]");
+            output.out().println(" ]");
         }
         try {
-            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+            output.resultPostfix(new AlgebricksAppendable(output.out()));
         } catch (AlgebricksException e) {
             throw new HyracksDataException(e);
         }
         if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
+            output.out().println("</pre>");
         }
-        conf.out().flush();
+        output.out().flush();
     }
 
     private void displayRecord(String result) throws HyracksDataException {
@@ -177,7 +180,7 @@ public class ResultPrinter {
             // TODO(tillw): this is inefficient as well
             record = JSONUtil.quoteAndEscape(record);
         }
-        conf.out().print(record);
+        output.out().print(record);
         stats.setCount(stats.getCount() + 1);
         // TODO(tillw) fix this approximation
         stats.setSize(stats.getSize() + record.length());
@@ -211,7 +214,7 @@ public class ResultPrinter {
                 }
                 String result = new String(frameBytes, start, length, UTF_8);
                 if (wrapArray && notFirst) {
-                    conf.out().print(", ");
+                    output.out().print(", ");
                 }
                 notFirst = true;
                 displayRecord(result);


Mime
View raw message